Skip to content

[FLINK-38087][sql-gateway] Improve OperationExecutor error messages and Java stream usage #26781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -180,28 +182,28 @@ public ResultFetcher configureSession(OperationHandle handle, String statement)
}
Operation op = parsedOperations.get(0);

if (!(op instanceof SetOperation)
&& !(op instanceof ResetOperation)
&& !(op instanceof CreateOperation)
&& !(op instanceof DropOperation)
&& !(op instanceof UseOperation)
&& !(op instanceof AlterOperation)
&& !(op instanceof LoadModuleOperation)
&& !(op instanceof UnloadModuleOperation)
&& !(op instanceof AddJarOperation)) {
// Use LinkedHashMap to preserve insertion order
Map<Class<?>, String> ops = new LinkedHashMap<>();
ops.put(SetOperation.class, "SET");
ops.put(ResetOperation.class, "RESET");
ops.put(
CreateOperation.class,
"CREATE TABLE, CREATE DATABASE, CREATE FUNCTION, CREATE CATALOG, CREATE VIEW");
ops.put(
DropOperation.class,
"DROP TABLE, DROP DATABASE, DROP FUNCTION, DROP CATALOG, DROP VIEW");
ops.put(AlterOperation.class, "ALTER TABLE, ALTER DATABASE, ALTER FUNCTION");
ops.put(UseOperation.class, "USE CATALOG, USE [CATALOG.]DATABASE, USE MODULE");
ops.put(LoadModuleOperation.class, "LOAD MODULE");
ops.put(UnloadModuleOperation.class, "UNLOAD MODULE");
ops.put(AddJarOperation.class, "ADD JAR");

if (ops.keySet().stream().noneMatch(c -> c.isInstance(op))) {
throw new UnsupportedOperationException(
String.format(
"Unsupported statement for configuring session:%s\n"
+ "The configureSession API only supports to execute statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ "CREATE CATALOG, DROP CATALOG, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, "
+ "CREATE VIEW, DROP VIEW, "
+ "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+ "ADD JAR.",
statement));
"Unsupported statement for configuring session: %s\n"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Was this PR generated with the AI assistant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. And thanks for reviewing!

I started with a simple fix of the incomplete error message and later a few ad hoc Java improvements when I read the code for the patch. Those are simple to implement so I didn't use AI tooling.

When addressing the specific comment of putting things in a variable, I found it is not straightforward to automatically "infer" the error message from all supported operations. I asked AI assistant and it gave me a much bigger change by defining "getting operation name" for all operations. I think that is too much for just error message constructing. So I went with the current approach , which is still better than existing code as it couples the supported operations with the error message by a single local variable.

+ "The configureSession API only supports executing statements of type %s.",
statement, String.join(", ", ops.values())));
}

if (op instanceof SetOperation) {
Expand Down Expand Up @@ -290,8 +292,8 @@ public Set<String> listDatabases(String catalogName) {
public Set<TableInfo> listTables(
String catalogName, String databaseName, Set<TableKind> tableKinds) {
checkArgument(
Arrays.asList(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds),
"Currently only support to list TABLE, VIEW or TABLE AND VIEW.");
EnumSet.of(TableKind.TABLE, TableKind.VIEW).containsAll(tableKinds),
"Currently only supports listing TABLE, VIEW or TABLE AND VIEW.");
if (tableKinds.contains(TableKind.TABLE) && tableKinds.contains(TableKind.VIEW)) {
return listTables(catalogName, databaseName, true);
} else if (tableKinds.contains(TableKind.TABLE)) {
Expand Down Expand Up @@ -608,7 +610,7 @@ private ResultFetcher callSetOperation(
// set a property
sessionContext.set(setOp.getKey().get().trim(), setOp.getValue().get().trim());
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
} else if (!setOp.getKey().isPresent() && !setOp.getValue().isPresent()) {
} else if (setOp.getKey().isEmpty() && setOp.getValue().isEmpty()) {
// show all properties
Map<String, String> configMap = tableEnv.getConfig().getConfiguration().toMap();
return ResultFetcher.fromResults(
Expand Down Expand Up @@ -743,23 +745,21 @@ private Set<TableInfo> listTables(
ObjectIdentifier.of(
catalogName, databaseName, name),
TableKind.TABLE)));
return Collections.unmodifiableSet(new HashSet<>(ans.values()));
return ans.values().stream().collect(Collectors.toUnmodifiableSet());
}

private Set<TableInfo> listViews(String catalogName, String databaseName) {
return Collections.unmodifiableSet(
sessionContext
.getSessionState()
.catalogManager
.listViews(catalogName, databaseName)
.stream()
.map(
name ->
new TableInfo(
ObjectIdentifier.of(
catalogName, databaseName, name),
TableKind.VIEW))
.collect(Collectors.toSet()));
return sessionContext
.getSessionState()
.catalogManager
.listViews(catalogName, databaseName)
.stream()
.map(
name ->
new TableInfo(
ObjectIdentifier.of(catalogName, databaseName, name),
TableKind.VIEW))
.collect(Collectors.toUnmodifiableSet());
}

public ResultFetcher callStopJobOperation(
Expand Down Expand Up @@ -890,7 +890,7 @@ public ResultFetcher callDescribeJobOperation(
}
});

if (!jobStatusOp.isPresent()) {
if (jobStatusOp.isEmpty()) {
throw new SqlExecutionException(
String.format("Described job %s does not exist in the cluster.", jobId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,13 +981,16 @@ void testConfigureSessionWithIllegalStatement() {
.satisfies(
FlinkAssertions.anyCauseMatches(
UnsupportedOperationException.class,
"Unsupported statement for configuring session:SELECT 1;\n"
+ "The configureSession API only supports to execute statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ "CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, "
+ "CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR."));
"Unsupported statement for configuring session: SELECT 1;\n"
+ "The configureSession API only supports executing statements of type "
+ "SET, RESET, "
+ "CREATE TABLE, CREATE DATABASE, CREATE FUNCTION, CREATE CATALOG, CREATE VIEW, "
+ "DROP TABLE, DROP DATABASE, DROP FUNCTION, DROP CATALOG, DROP VIEW, "
+ "ALTER TABLE, ALTER DATABASE, ALTER FUNCTION, "
+ "USE CATALOG, USE [CATALOG.]DATABASE, USE MODULE, "
+ "LOAD MODULE, "
+ "UNLOAD MODULE, "
+ "ADD JAR"));
}

@Test
Expand Down