Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,11 @@
if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final String errorMsg =
String.format(
"Failed to login to receiver %s:%s for legacy pipe transfer because %s",
ipAddress, port, openSessionResp.getStatus().getMessage());
"Failed to login to receiver %s:%s for legacy pipe transfer because code: %d, message: %s",

Check warning on line 281 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 105).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RN8XkTwJMEW2kdJsZ&open=AZ8RN8XkTwJMEW2kdJsZ&pullRequest=18048
ipAddress,
port,
openSessionResp.getStatus().getCode(),
openSessionResp.getStatus().getMessage());
Comment on lines -281 to +285

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i18n

LOGGER.warn(errorMsg);
throw new PipeRuntimeCriticalException(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private boolean onPipeConnectionException(final Throwable throwable) {
MAX_RETRY_TIMES,
e);
try {
sleepIfNoHighPriorityTask(retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
sleepIfNoHighPriorityTask(getHandshakeRetrySleepInterval(e, retry));
} catch (final InterruptedException interruptedException) {
LOGGER.info(
PipeMessages.INTERRUPTED_WHILE_SLEEPING_RETRY_HANDSHAKE, interruptedException);
Expand Down Expand Up @@ -218,6 +218,13 @@ private boolean onPipeConnectionException(final Throwable throwable) {
return false;
}

private long getHandshakeRetrySleepInterval(final Throwable throwable, final int retry) {
final long defaultInterval = retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs();
return isAuthenticationFailure(throwable)
? Math.max(defaultInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS)
: defaultInterval;
}

/**
* Submit a {@link PipeSubtask} to the executor to keep it running. Note that the function will be
* called when connector starts or the subTask finishes the last round, Thus the {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,33 @@
import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

public abstract class PipeReportableSubtask extends PipeSubtask {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeReportableSubtask.class);
private static final long DEFAULT_LOGIN_LOCK_WINDOW_MS = TimeUnit.MINUTES.toMillis(10);
private static final int DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS = 5;
private static final int AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS = 2;
protected static final long AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS =
DEFAULT_LOGIN_LOCK_WINDOW_MS
/ (DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS - AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS)
+ TimeUnit.SECONDS.toMillis(1);
private static final Pattern AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN =
Pattern.compile(
String.format(
"(?i)(?:\\b(?:code|status code)\\s*[:=]\\s*(?:%d|%d)\\b|\\b(?:%d|%d):|\\b(?:WRONG_LOGIN_PASSWORD|USER_LOGIN_LOCKED)\\b)",

Check warning on line 52 in iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 135).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8RN8THTwJMEW2kdJsY&open=AZ8RN8THTwJMEW2kdJsY&pullRequest=18048
TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(),
TSStatusCode.USER_LOGIN_LOCKED.getStatusCode(),
TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(),
TSStatusCode.USER_LOGIN_LOCKED.getStatusCode()));
// To ensure that high-priority tasks can obtain object locks first, a counter is now used to save
// the number of high-priority tasks.
protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);
Expand Down Expand Up @@ -65,7 +83,7 @@
// is dropped or the process is running normally.
}

private long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
protected long getSleepIntervalBasedOnThrowable(final Throwable throwable) {
long sleepInterval = Math.min(1000L * retryCount.get(), 10000);
// if receiver is read-only/internal-error/write-reject, connector will retry with
// power-increasing interval
Expand All @@ -76,9 +94,24 @@
sleepInterval = 1000L * retryCount.get() * retryCount.get();
}
}
if (isAuthenticationFailure(throwable)) {
sleepInterval = Math.max(sleepInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS);
}
return sleepInterval;
}

protected static boolean isAuthenticationFailure(final Throwable throwable) {
Throwable current = throwable;
while (current != null) {
final String message = current.getMessage();
if (message != null && AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN.matcher(message).find()) {
return true;
}
current = current.getCause();
}
return false;
}

private void onReportEventFailure(final Throwable throwable) {
final int maxRetryTimes =
throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
Expand Down Expand Up @@ -199,10 +232,13 @@
}

protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException {
if (sleepMillis <= 0) {
return;
}
synchronized (highPriorityLockTaskCount) {
// The wait operation will release the highPriorityLockTaskCount lock, so there will be
// no deadlock.
if (highPriorityLockTaskCount.get() > 0) {
if (highPriorityLockTaskCount.get() == 0) {
highPriorityLockTaskCount.wait(sleepMillis);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ public void sendHandshakeReq(final Pair<IoTDBSyncClient, Boolean> clientAndStatu
client.getIpAddress(),
client.getPort(),
resp.getStatus());
endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage());
endPoint2HandshakeErrorMessage.put(
client.getEndPoint(),
String.format(
"code: %d, message: %s",
resp.getStatus().getCode(), resp.getStatus().getMessage()));
} else {
clientAndStatus.setRight(true);
client.setTimeout(CONNECTION_TIMEOUT_MS.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,48 @@
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class PipeSleepIntervalTest {
private static class TestSinkSubtask extends PipeAbstractSinkSubtask {

TestSinkSubtask() {
super(null, 0, null);
}

@Override
protected String getRootCause(Throwable throwable) {
return null;
}

@Override
protected void report(EnrichedEvent event, PipeRuntimeException exception) {}

@Override
protected boolean executeOnce() {
return false;
}

long getSleepInterval(final Throwable throwable) {
return getSleepIntervalBasedOnThrowable(throwable);
}

boolean isAuthenticationFailureException(final Throwable throwable) {
return isAuthenticationFailure(throwable);
}

void sleepWithoutHighPriorityTask(final long sleepMillis) throws InterruptedException {
sleepIfNoHighPriorityTask(sleepMillis);
}
}

private long oldPipeSinkSubtaskSleepIntervalInitMs;
private long oldPipeSinkSubtaskSleepIntervalMaxMs;

Expand All @@ -53,21 +88,7 @@ public void tearDown() throws Exception {

@Test
public void test() {
try (final PipeAbstractSinkSubtask subtask =
new PipeAbstractSinkSubtask(null, 0, null) {
@Override
protected String getRootCause(Throwable throwable) {
return null;
}

@Override
protected void report(EnrichedEvent event, PipeRuntimeException exception) {}

@Override
protected boolean executeOnce() {
return false;
}
}) {
try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
long startTime = System.currentTimeMillis();
subtask.sleep4NonReportException();
Assert.assertTrue(
Expand All @@ -80,4 +101,39 @@ protected boolean executeOnce() {
>= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs());
}
}

@Test
public void testAuthenticationFailureRetryInterval() {
try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
Assert.assertTrue(
subtask.isAuthenticationFailureException(
new PipeConnectionException(
"Handshake error with receiver 127.0.0.1:6667, code: 801, message: Authentication failed.")));
Assert.assertTrue(
subtask.isAuthenticationFailureException(
new PipeConnectionException("801: Failed to check password for pipe a2b.")));
Assert.assertTrue(
subtask.isAuthenticationFailureException(
new PipeConnectionException("status code: 822, message: Account is blocked.")));
Assert.assertFalse(
subtask.isAuthenticationFailureException(
new PipeConnectionException("Network error 801 bytes sent.")));

final long authenticationFailureRetryInterval =
subtask.getSleepInterval(new PipeConnectionException("code: 801"));
Assert.assertTrue(authenticationFailureRetryInterval > TimeUnit.MINUTES.toMillis(3));
Assert.assertTrue(authenticationFailureRetryInterval * 3 > TimeUnit.MINUTES.toMillis(10));
Assert.assertTrue(
subtask.getSleepInterval(new PipeConnectionException("network error")) <= 10000);
}
}

@Test
public void testSleepIfNoHighPriorityTaskWaits() throws Exception {
try (final TestSinkSubtask subtask = new TestSinkSubtask()) {
final long startTime = System.currentTimeMillis();
subtask.sleepWithoutHighPriorityTask(20L);
Assert.assertTrue(System.currentTimeMillis() - startTime >= 15L);
}
}
}
Loading