Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[remove datanode] Add Remove DataNode SQL #14678

Merged
merged 9 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.iotdb.relational.it.query.old.aligned.TableUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
Expand All @@ -42,6 +44,7 @@

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.List;
Expand All @@ -57,7 +60,7 @@
public class IoTDBRemoveDataNodeITFramework {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveDataNodeITFramework.class);
private static final String INSERTION1 =
private static final String TREE_MODEL_INSERTION =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";

private static final String SHOW_REGIONS = "show regions";
Expand All @@ -66,6 +69,10 @@ public class IoTDBRemoveDataNodeITFramework {
private static final String defaultSchemaRegionGroupExtensionPolicy = "CUSTOM";
private static final String defaultDataRegionGroupExtensionPolicy = "CUSTOM";

public static final int NOT_USE_SQL = 0;
OneSizeFitsQuorum marked this conversation as resolved.
Show resolved Hide resolved
public static final int TREE_MODEL_SQL = 1;
public static final int TABLE_MODEL_SQL = 3;

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand All @@ -90,7 +97,8 @@ public void successTest(
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
final boolean rejoinRemovedDataNode,
final int SQLType)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -100,7 +108,8 @@ public void successTest(
removeDataNodeNum,
dataRegionPerDataNode,
true,
rejoinRemovedDataNode);
rejoinRemovedDataNode,
SQLType);
}

public void failTest(
Expand All @@ -110,7 +119,8 @@ public void failTest(
final int dataNodeNum,
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean rejoinRemovedDataNode)
final boolean rejoinRemovedDataNode,
final int SQLType)
throws Exception {
testRemoveDataNode(
dataReplicateFactor,
Expand All @@ -120,7 +130,8 @@ public void failTest(
removeDataNodeNum,
dataRegionPerDataNode,
false,
rejoinRemovedDataNode);
rejoinRemovedDataNode,
SQLType);
}

public void testRemoveDataNode(
Expand All @@ -131,7 +142,8 @@ public void testRemoveDataNode(
final int removeDataNodeNum,
final int dataRegionPerDataNode,
final boolean expectRemoveSuccess,
final boolean rejoinRemovedDataNode)
final boolean rejoinRemovedDataNode,
final int SQLType)
throws Exception {
// Set up the environment
EnvFactory.getEnv()
Expand All @@ -143,13 +155,18 @@ public void testRemoveDataNode(
dataRegionPerDataNode * dataNodeNum / dataReplicateFactor);
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);

try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
try (final Connection connection = makeItCloseQuietly(getConnectionWithSQLType(SQLType));
final Statement statement = makeItCloseQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {

// Insert data
statement.execute(INSERTION1);
if (SQLType == TABLE_MODEL_SQL) {
// Insert data in table model
TableUtils.insertData();
} else {
// Insert data in tree model
statement.execute(TREE_MODEL_INSERTION);
}

Map<Integer, Set<Integer>> regionMap = getDataRegionMap(statement);
regionMap.forEach(
Expand Down Expand Up @@ -187,21 +204,38 @@ public void testRemoveDataNode(
.map(TDataNodeConfiguration::getLocation)
.filter(location -> removeDataNodes.contains(location.getDataNodeId()))
.collect(Collectors.toList());
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations);

// Remove data nodes
TDataNodeRemoveResp removeResp = clientRef.get().removeDataNode(removeReq);
LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (expectRemoveSuccess) {
LOGGER.error("Submit Remove DataNodes fail");
Assert.fail();
} else {
LOGGER.info("Submit Remove DataNodes fail, as expected");
return;
if (SQLType != NOT_USE_SQL) {
String removeDataNodeSQL = generateRemoveString(removeDataNodes);
LOGGER.info("Remove DataNodes SQL: {}", removeDataNodeSQL);
try {
statement.execute(removeDataNodeSQL);
} catch (IoTDBSQLException e) {
if (expectRemoveSuccess) {
LOGGER.error("Remove DataNodes SQL execute fail: {}", e.getMessage());
Assert.fail();
} else {
LOGGER.info("Submit Remove DataNodes fail, as expected");
return;
}
}
LOGGER.info("Remove DataNodes SQL submit successfully.");
} else {
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(removeDataNodeLocations);

// Remove data nodes
TDataNodeRemoveResp removeResp = clientRef.get().removeDataNode(removeReq);
LOGGER.info("Submit Remove DataNodes result {} ", removeResp);
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (expectRemoveSuccess) {
LOGGER.error("Submit Remove DataNodes fail");
Assert.fail();
} else {
LOGGER.info("Submit Remove DataNodes fail, as expected.");
return;
}
}
LOGGER.info("Submit Remove DataNodes request: {}", removeReq);
}
LOGGER.info("Submit Remove DataNodes request: {}", removeReq);

// Wait until success
boolean removeSuccess = false;
Expand Down Expand Up @@ -363,4 +397,24 @@ public void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
LOGGER.info("Node {} restarted.", nodeWrapper.getId());
});
}

public static String generateRemoveString(Set<Integer> dataNodes) {
StringBuilder sb = new StringBuilder("remove datanode ");

for (Integer node : dataNodes) {
sb.append(node).append(", ");
}

sb.setLength(sb.length() - 2);

return sb.toString();
}

public Connection getConnectionWithSQLType(int SQLType) throws SQLException {
if (SQLType == TABLE_MODEL_SQL) {
return EnvFactory.getEnv().getTableConnection();
} else {
return EnvFactory.getEnv().getConnection();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,34 @@
@Category({ClusterIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRemoveDataNodeNormalIT extends IoTDBRemoveDataNodeITFramework {

@Test
public void success1C4DTest() throws Exception {
successTest(2, 3, 1, 4, 1, 2, true);
successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.NOT_USE_SQL);
}

@Test
public void fail1C3DTest() throws Exception {
failTest(2, 3, 1, 3, 1, 2, false);
failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.NOT_USE_SQL);
}

@Test
public void success1C4DTestUseSQL() throws Exception {
successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.TREE_MODEL_SQL);
}

@Test
public void fail1C3DTestUseSQL() throws Exception {
failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.TREE_MODEL_SQL);
}

@Test
public void success1C4DTestUseTableSQL() throws Exception {
successTest(2, 3, 1, 4, 1, 2, true, IoTDBRemoveDataNodeITFramework.TABLE_MODEL_SQL);
}

@Test
public void fail1C3DTestUseTableSQL() throws Exception {
failTest(2, 3, 1, 3, 1, 2, false, IoTDBRemoveDataNodeITFramework.TABLE_MODEL_SQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ keyWords
| DATA_REGION_GROUP_NUM
| DATABASE
| DATABASES
| DATANODE
| DATANODEID
| DATANODES
| DATASET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ ddlStatement
// Cluster
| showVariables | showCluster | showRegions | showDataNodes | showConfigNodes | showClusterId
| getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList
| migrateRegion | reconstructRegion | extendRegion | removeRegion
| migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode
| verifyConnection
// AINode
| showAINodes | createModel | dropModel | showModels | callInference
Expand Down Expand Up @@ -550,6 +550,11 @@ verifyConnection
: VERIFY CONNECTION (DETAILS)?
;

// ---- Remove DataNode
removeDataNode
: REMOVE DATANODE dataNodeId=INTEGER_LITERAL (COMMA dataNodeId=INTEGER_LITERAL)*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove datanodes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both "datanode" and "datanodes" make sense in this context, so I’d prefer to leave it as is for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with current implementation because we use remove-datanode.sh

;

// Pipe Task =========================================================================================
createPipe
: CREATE PIPE (IF NOT EXISTS)? pipeName=identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ DATABASES
: D A T A B A S E S
;

DATANODE
: D A T A N O D E
;

DATANODEID
: D A T A N O D E I D
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.KillQuery;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetConfiguration;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetProperties;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowAINodes;
Expand Down Expand Up @@ -407,6 +408,7 @@ private IQueryExecution createQueryExecutionForTableModel(
|| statement instanceof StartRepairData
|| statement instanceof StopRepairData
|| statement instanceof PipeStatement
|| statement instanceof RemoveDataNode
|| statement instanceof SubscriptionStatement
|| statement instanceof ShowCurrentSqlDialect
|| statement instanceof ShowCurrentUser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowFunctionsTask;
Expand Down Expand Up @@ -120,6 +121,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameTable;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetConfiguration;
Expand Down Expand Up @@ -151,6 +153,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewrite;
import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
Expand Down Expand Up @@ -351,6 +354,18 @@ protected IConfigTask visitShowRegions(
return new ShowRegionTask(treeStatement, true);
}

@Override
protected IConfigTask visitRemoveDataNode(
final RemoveDataNode removeDataNode, final MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
accessControl.checkUserHasMaintainPrivilege(context.getSession().getUserName());
// As the implementation is identical, we'll simply translate to the
// corresponding tree-model variant and execute that.
final RemoveDataNodeStatement treeStatement =
new RemoveDataNodeStatement(removeDataNode.getNodeIds());
return new RemoveDataNodeTask(treeStatement);
}

@Override
protected IConfigTask visitShowDataNodes(
final ShowDataNodes showDataNodesStatement, final MPPQueryContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionIdTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.SetTTLTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowAINodesTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterDetailsTask;
Expand Down Expand Up @@ -117,6 +118,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterIdStatement;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement;
Expand Down Expand Up @@ -652,6 +654,12 @@ public IConfigTask visitRemoveRegion(
return new RemoveRegionTask(removeRegionStatement);
}

@Override
public IConfigTask visitRemoveDataNode(
RemoveDataNodeStatement removeDataNodeStatement, MPPQueryContext context) {
return new RemoveDataNodeTask(removeDataNodeStatement);
}

@Override
public IConfigTask visitCreateContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context) {
Expand Down
Loading
Loading