Skip to content

Commit de47319

Browse files
author
dujie
committed
[FLINK-37827][table] Fix use SHOW CREATE TABLE for MATERIALIZED_TABLE
1 parent 5eeb2fc commit de47319

File tree

2 files changed

+222
-11
lines changed

2 files changed

+222
-11
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import org.apache.flink.table.api.TableException;
2323
import org.apache.flink.table.catalog.CatalogBaseTable;
2424
import org.apache.flink.table.catalog.CatalogDescriptor;
25+
import org.apache.flink.table.catalog.CatalogMaterializedTable;
2526
import org.apache.flink.table.catalog.CatalogView;
2627
import org.apache.flink.table.catalog.Column;
2728
import org.apache.flink.table.catalog.ObjectIdentifier;
2829
import org.apache.flink.table.catalog.QueryOperationCatalogView;
2930
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
31+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
3032
import org.apache.flink.table.catalog.ResolvedCatalogModel;
3133
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3234
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -87,6 +89,9 @@ public static String buildShowCreateTableRow(
8789
String.format(
8890
"SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.",
8991
tableIdentifier.asSerializableString()));
92+
} else if (table.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
93+
return buildShowCreateMaterializedTableRow(
94+
(ResolvedCatalogMaterializedTable) table, tableIdentifier, isTemporary);
9095
}
9196
StringBuilder sb =
9297
new StringBuilder()
@@ -97,16 +102,30 @@ public static String buildShowCreateTableRow(
97102
extractFormattedPrimaryKey(table, PRINT_INDENT)
98103
.ifPresent(pk -> sb.append(",\n").append(pk));
99104
sb.append("\n)\n");
100-
extractComment(table).ifPresent(c -> sb.append(formatComment(c)).append("\n"));
101-
extractFormattedDistributedInfo((ResolvedCatalogTable) table).ifPresent(sb::append);
102-
extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
103-
.ifPresent(
104-
partitionedInfoFormatted ->
105-
sb.append("PARTITIONED BY (")
106-
.append(partitionedInfoFormatted)
107-
.append(")\n"));
108-
extractFormattedOptions(table.getOptions(), PRINT_INDENT)
109-
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
105+
106+
appendCommonTableElements(sb, table);
107+
return sb.toString();
108+
}
109+
110+
private static String buildShowCreateMaterializedTableRow(
111+
ResolvedCatalogMaterializedTable table,
112+
ObjectIdentifier tableIdentifier,
113+
boolean isTemporary) {
114+
StringBuilder sb =
115+
new StringBuilder()
116+
.append(
117+
buildCreateFormattedPrefix(
118+
"MATERIALIZED TABLE", isTemporary, tableIdentifier));
119+
120+
extractFormattedPrimaryKey(table, PRINT_INDENT)
121+
.ifPresent(pk -> sb.append("(").append(pk).append(")\n"));
122+
123+
appendCommonTableElements(sb, table);
124+
125+
sb.append(extractMaterializedTablefreshNess(table)).append("\n");
126+
extractMaterializedTableRefreshMode(table).ifPresent(mode -> sb.append(mode).append("\n"));
127+
128+
sb.append("AS ").append(table.getDefinitionQuery());
110129
return sb.toString();
111130
}
112131

@@ -151,7 +170,13 @@ public static String buildShowCreateCatalogRow(CatalogDescriptor catalogDescript
151170

152171
static String buildCreateFormattedPrefix(
153172
String type, boolean isTemporary, ObjectIdentifier identifier) {
154-
String postName = "model".equalsIgnoreCase(type) ? "" : " (";
173+
String postName;
174+
if ("model".equalsIgnoreCase(type) || "MATERIALIZED TABLE".equalsIgnoreCase(type)) {
175+
postName = "";
176+
} else {
177+
postName = " (";
178+
}
179+
155180
return String.format(
156181
"CREATE %s%s %s%s%s",
157182
isTemporary ? "TEMPORARY " : "",
@@ -269,6 +294,53 @@ static Optional<String> extractFormattedPartitionedInfo(ResolvedCatalogTable cat
269294
.collect(Collectors.joining(", ")));
270295
}
271296

297+
private static void appendCommonTableElements(
298+
StringBuilder sb, ResolvedCatalogBaseTable<?> table) {
299+
300+
extractComment(table).ifPresent(c -> sb.append(formatComment(c)).append("\n"));
301+
302+
if (table instanceof ResolvedCatalogTable) {
303+
extractFormattedDistributedInfo((ResolvedCatalogTable) table).ifPresent(sb::append);
304+
}
305+
306+
Optional<String> partitionedInfo;
307+
if (table instanceof ResolvedCatalogMaterializedTable) {
308+
partitionedInfo =
309+
extractFormattedPartitionedInfo((ResolvedCatalogMaterializedTable) table);
310+
} else {
311+
partitionedInfo = extractFormattedPartitionedInfo((ResolvedCatalogTable) table);
312+
}
313+
partitionedInfo.ifPresent(info -> sb.append("PARTITIONED BY (").append(info).append(")\n"));
314+
315+
extractFormattedOptions(table.getOptions(), PRINT_INDENT)
316+
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
317+
}
318+
319+
static Optional<String> extractFormattedPartitionedInfo(
320+
ResolvedCatalogMaterializedTable catalogTable) {
321+
if (!catalogTable.isPartitioned()) {
322+
return Optional.empty();
323+
}
324+
return Optional.of(
325+
catalogTable.getPartitionKeys().stream()
326+
.map(EncodingUtils::escapeIdentifier)
327+
.collect(Collectors.joining(", ")));
328+
}
329+
330+
static String extractMaterializedTablefreshNess(ResolvedCatalogMaterializedTable table) {
331+
return String.format("FRESHNESS = %s ", table.getDefinitionFreshness().toString());
332+
}
333+
334+
static Optional<String> extractMaterializedTableRefreshMode(
335+
ResolvedCatalogMaterializedTable table) {
336+
CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode =
337+
table.getLogicalRefreshMode();
338+
if (logicalRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC) {
339+
return Optional.empty();
340+
}
341+
return Optional.of(String.format("REFRESH_MODE = %s", table.getRefreshMode().name()));
342+
}
343+
272344
static Optional<String> extractFormattedOptions(Map<String, String> conf, String printIndent) {
273345
if (Objects.isNull(conf) || conf.isEmpty()) {
274346
return Optional.empty();

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@
2222
import org.apache.flink.table.api.DataTypes;
2323
import org.apache.flink.table.api.Schema;
2424
import org.apache.flink.table.catalog.CatalogDescriptor;
25+
import org.apache.flink.table.catalog.CatalogMaterializedTable;
2526
import org.apache.flink.table.catalog.CatalogTable;
2627
import org.apache.flink.table.catalog.CatalogView;
2728
import org.apache.flink.table.catalog.Column;
29+
import org.apache.flink.table.catalog.IntervalFreshness;
2830
import org.apache.flink.table.catalog.ObjectIdentifier;
31+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
2932
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3033
import org.apache.flink.table.catalog.ResolvedCatalogView;
3134
import org.apache.flink.table.catalog.ResolvedSchema;
3235
import org.apache.flink.table.catalog.TableDistribution;
36+
import org.apache.flink.table.catalog.UniqueConstraint;
3337
import org.apache.flink.table.expressions.DefaultSqlFactory;
3438

3539
import org.junit.jupiter.params.ParameterizedTest;
@@ -61,6 +65,16 @@ public class ShowCreateUtilTest {
6165
Column.physical("id", DataTypes.INT()),
6266
Column.physical("name", DataTypes.STRING()));
6367

68+
private static final ResolvedSchema THREE_COLUMNS_PK_SCHEMA =
69+
new ResolvedSchema(
70+
Arrays.asList(
71+
Column.physical("id", DataTypes.INT()),
72+
Column.physical("name", DataTypes.STRING()),
73+
Column.physical("birthday", DataTypes.STRING())),
74+
Collections.emptyList(),
75+
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
76+
;
77+
6478
@ParameterizedTest(name = "{index}: {1}")
6579
@MethodSource("argsForShowCreateTable")
6680
void showCreateTable(ResolvedCatalogTable resolvedCatalogTable, String expected) {
@@ -70,6 +84,16 @@ void showCreateTable(ResolvedCatalogTable resolvedCatalogTable, String expected)
7084
assertThat(createTableString).isEqualTo(expected);
7185
}
7286

87+
@ParameterizedTest(name = "{index}: {1}")
88+
@MethodSource("argsForShowCreateMaterializedTable")
89+
void showCreateMaterializedTable(
90+
ResolvedCatalogMaterializedTable resolvedCatalogTable, String expected) {
91+
final String createTableString =
92+
ShowCreateUtil.buildShowCreateTableRow(
93+
resolvedCatalogTable, TABLE_IDENTIFIER, false, DefaultSqlFactory.INSTANCE);
94+
assertThat(createTableString).isEqualTo(expected);
95+
}
96+
7397
@ParameterizedTest(name = "{index}: {1}")
7498
@MethodSource("argsForShowCreateView")
7599
void showCreateView(ResolvedCatalogView resolvedCatalogView, String expected) {
@@ -241,6 +265,96 @@ private static Collection<Arguments> argsForShowCreateTable() {
241265
return argList;
242266
}
243267

268+
private static Collection<Arguments> argsForShowCreateMaterializedTable() {
269+
Collection<Arguments> argList = new ArrayList<>();
270+
271+
final Map<String, String> options = new HashMap<>();
272+
options.put("option_key_a", "option_value_a");
273+
options.put("option_key_b", "option_value_b");
274+
options.put("option_key_c", "option_value_c");
275+
276+
argList.add(
277+
Arguments.of(
278+
createResolvedCatalogMaterializedTable(
279+
THREE_COLUMNS_PK_SCHEMA,
280+
options,
281+
List.of("birthday"),
282+
"table comment",
283+
"( SELECT id, birthday, name\n"
284+
+ " FROM json_source\n"
285+
+ ") AS tmp",
286+
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS,
287+
CatalogMaterializedTable.RefreshMode.CONTINUOUS),
288+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`tableName`\n"
289+
+ "( CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED)\n"
290+
+ "COMMENT 'table comment'\n"
291+
+ "PARTITIONED BY (`birthday`)\n"
292+
+ "WITH (\n"
293+
+ " 'option_key_a' = 'option_value_a',\n"
294+
+ " 'option_key_b' = 'option_value_b',\n"
295+
+ " 'option_key_c' = 'option_value_c'\n"
296+
+ ")\n"
297+
+ "FRESHNESS = INTERVAL '5' MINUTE \n"
298+
+ "REFRESH_MODE = CONTINUOUS\n"
299+
+ "AS ( SELECT id, birthday, name\n"
300+
+ " FROM json_source\n"
301+
+ ") AS tmp"));
302+
303+
argList.add(
304+
Arguments.of(
305+
createResolvedCatalogMaterializedTable(
306+
THREE_COLUMNS_PK_SCHEMA,
307+
options,
308+
List.of("birthday"),
309+
"table comment",
310+
"( SELECT id, birthday, name\n"
311+
+ " FROM json_source\n"
312+
+ ") AS tmp",
313+
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS,
314+
CatalogMaterializedTable.RefreshMode.FULL),
315+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`tableName`\n"
316+
+ "( CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED)\n"
317+
+ "COMMENT 'table comment'\n"
318+
+ "PARTITIONED BY (`birthday`)\n"
319+
+ "WITH (\n"
320+
+ " 'option_key_a' = 'option_value_a',\n"
321+
+ " 'option_key_b' = 'option_value_b',\n"
322+
+ " 'option_key_c' = 'option_value_c'\n"
323+
+ ")\n"
324+
+ "FRESHNESS = INTERVAL '5' MINUTE \n"
325+
+ "REFRESH_MODE = FULL\n"
326+
+ "AS ( SELECT id, birthday, name\n"
327+
+ " FROM json_source\n"
328+
+ ") AS tmp"));
329+
330+
argList.add(
331+
Arguments.of(
332+
createResolvedCatalogMaterializedTable(
333+
THREE_COLUMNS_PK_SCHEMA,
334+
options,
335+
List.of("birthday"),
336+
"table comment",
337+
"( SELECT id, birthday, name\n"
338+
+ " FROM json_source\n"
339+
+ ") AS tmp",
340+
CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC,
341+
CatalogMaterializedTable.RefreshMode.CONTINUOUS),
342+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`tableName`\n"
343+
+ "( CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED)\n"
344+
+ "COMMENT 'table comment'\n"
345+
+ "PARTITIONED BY (`birthday`)\n"
346+
+ "WITH (\n"
347+
+ " 'option_key_a' = 'option_value_a',\n"
348+
+ " 'option_key_b' = 'option_value_b',\n"
349+
+ " 'option_key_c' = 'option_value_c'\n"
350+
+ ")\n"
351+
+ "FRESHNESS = INTERVAL '5' MINUTE \n"
352+
+ "AS ( SELECT id, birthday, name\n"
353+
+ " FROM json_source\n"
354+
+ ") AS tmp"));
355+
return argList;
356+
}
357+
244358
private static ResolvedCatalogTable createResolvedTable(
245359
ResolvedSchema resolvedSchema,
246360
Map<String, String> options,
@@ -273,4 +387,29 @@ private static ResolvedCatalogView createResolvedView(
273387
Collections.emptyMap()),
274388
resolvedSchema);
275389
}
390+
391+
private static ResolvedCatalogMaterializedTable createResolvedCatalogMaterializedTable(
392+
ResolvedSchema resolvedSchema,
393+
Map<String, String> options,
394+
List<String> partitionKeys,
395+
String comment,
396+
String query,
397+
CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode,
398+
CatalogMaterializedTable.RefreshMode refreshMode) {
399+
400+
CatalogMaterializedTable.Builder materializedTableBuilder =
401+
CatalogMaterializedTable.newBuilder()
402+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
403+
.comment(comment)
404+
.partitionKeys(partitionKeys)
405+
.options(options)
406+
.definitionQuery(query)
407+
.freshness(IntervalFreshness.ofMinute("5"))
408+
.logicalRefreshMode(logicalRefreshMode)
409+
.refreshMode(refreshMode)
410+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING);
411+
412+
return new ResolvedCatalogMaterializedTable(
413+
materializedTableBuilder.build(), resolvedSchema);
414+
}
276415
}

0 commit comments

Comments
 (0)