diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 9612b57fccab3..30af3bee160d8 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -114,8 +114,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -180,28 +182,28 @@ public ResultFetcher configureSession(OperationHandle handle, String statement) } Operation op = parsedOperations.get(0); - if (!(op instanceof SetOperation) - && !(op instanceof ResetOperation) - && !(op instanceof CreateOperation) - && !(op instanceof DropOperation) - && !(op instanceof UseOperation) - && !(op instanceof AlterOperation) - && !(op instanceof LoadModuleOperation) - && !(op instanceof UnloadModuleOperation) - && !(op instanceof AddJarOperation)) { + // Use LinkedHashMap to preserve insertion order + Map, String> ops = new LinkedHashMap<>(); + ops.put(SetOperation.class, "SET"); + ops.put(ResetOperation.class, "RESET"); + ops.put( + CreateOperation.class, + "CREATE TABLE, CREATE DATABASE, CREATE FUNCTION, CREATE CATALOG, CREATE VIEW"); + ops.put( + DropOperation.class, + "DROP TABLE, DROP DATABASE, DROP FUNCTION, DROP CATALOG, DROP VIEW"); + ops.put(AlterOperation.class, "ALTER TABLE, ALTER DATABASE, ALTER FUNCTION"); + ops.put(UseOperation.class, "USE CATALOG, USE [CATALOG.]DATABASE, USE MODULE"); + ops.put(LoadModuleOperation.class, "LOAD MODULE"); + ops.put(UnloadModuleOperation.class, "UNLOAD MODULE"); + ops.put(AddJarOperation.class, "ADD JAR"); + + if (ops.keySet().stream().noneMatch(c -> c.isInstance(op))) { throw new UnsupportedOperationException( String.format( - "Unsupported statement for configuring session:%s\n" - + "The configureSession API only supports to execute statement of type " - + "CREATE TABLE, DROP TABLE, ALTER TABLE, " - + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " - + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, " - + "CREATE CATALOG, DROP CATALOG, " - + "USE CATALOG, USE [CATALOG.]DATABASE, " - + "CREATE VIEW, DROP VIEW, " - + "LOAD MODULE, UNLOAD MODULE, USE MODULE, " - + "ADD JAR.", - statement)); + "Unsupported statement for configuring session: %s\n" + + "The configureSession API only supports executing statements of type %s.", + statement, String.join(", ", ops.values()))); } if (op instanceof SetOperation) { @@ -290,8 +292,8 @@ public Set listDatabases(String catalogName) { public Set listTables( String catalogName, String databaseName, Set tableKinds) { checkArgument( - Arrays.asList(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds), - "Currently only support to list TABLE, VIEW or TABLE AND VIEW."); + EnumSet.of(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds), + "Currently only supports listing TABLE, VIEW or TABLE AND VIEW."); if (tableKinds.contains(TableKind.TABLE) && tableKinds.contains(TableKind.VIEW)) { return listTables(catalogName, databaseName, true); } else if (tableKinds.contains(TableKind.TABLE)) { @@ -608,7 +610,7 @@ private ResultFetcher callSetOperation( // set a property sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim()); return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); - } else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) { + } else if (setOp.getKey().isEmpty() && setOp.getValue().isEmpty()) { // show all properties Map configMap = tableEnv.getConfig().getConfiguration().toMap(); return ResultFetcher.fromResults( @@ -743,23 +745,21 @@ private Set listTables( ObjectIdentifier.of( catalogName, databaseName, name), TableKind.TABLE))); - return Collections.unmodifiableSet(new HashSet<>(ans.values())); + return ans.values().stream().collect(Collectors.toUnmodifiableSet()); } private Set listViews(String catalogName, String databaseName) { - return Collections.unmodifiableSet( - sessionContext - .getSessionState() - .catalogManager - .listViews(catalogName, databaseName) - .stream() - .map( - name -> - new TableInfo( - ObjectIdentifier.of( - catalogName, databaseName, name), - TableKind.VIEW)) - .collect(Collectors.toSet())); + return sessionContext + .getSessionState() + .catalogManager + .listViews(catalogName, databaseName) + .stream() + .map( + name -> + new TableInfo( + ObjectIdentifier.of(catalogName, databaseName, name), + TableKind.VIEW)) + .collect(Collectors.toUnmodifiableSet()); } public ResultFetcher callStopJobOperation( @@ -890,7 +890,7 @@ public ResultFetcher callDescribeJobOperation( } }); - if (!jobStatusOp.isPresent()) { + if (jobStatusOp.isEmpty()) { throw new SqlExecutionException( String.format("Described job %s does not exist in the cluster.", jobId)); } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index e2536b0447b8b..f6a03f41b7444 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -981,13 +981,16 @@ void testConfigureSessionWithIllegalStatement() { .satisfies( FlinkAssertions.anyCauseMatches( UnsupportedOperationException.class, - "Unsupported statement for configuring session:SELECT 1;\n" - + "The configureSession API only supports to execute statement of type " - + "CREATE TABLE, DROP TABLE, ALTER TABLE, " - + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " - + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, " - + "CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " - + "CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR.")); + "Unsupported statement for configuring session: SELECT 1;\n" + + "The configureSession API only supports executing statements of type " + + "SET, RESET, " + + "CREATE TABLE, CREATE DATABASE, CREATE FUNCTION, CREATE CATALOG, CREATE VIEW, " + + "DROP TABLE, DROP DATABASE, DROP FUNCTION, DROP CATALOG, DROP VIEW, " + + "ALTER TABLE, ALTER DATABASE, ALTER FUNCTION, " + + "USE CATALOG, USE [CATALOG.]DATABASE, USE MODULE, " + + "LOAD MODULE, " + + "UNLOAD MODULE, " + + "ADD JAR")); } @Test