Skip to content

Commit c3353d5

Browse files
committed
[Feature] support create/drop/query iceberg view for hive catalog
add ut
1 parent e52cb9e commit c3353d5

File tree

10 files changed

+241
-49
lines changed

10 files changed

+241
-49
lines changed

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ public void renameTable(String dbName, String tblName, String newTblName) throws
188188
}
189189

190190
@Override
191-
public boolean createView(ConnectorViewDefinition connectorViewDefinition, boolean replace) {
192-
return delegate.createView(connectorViewDefinition, replace);
191+
public boolean createView(String catalogName, ConnectorViewDefinition connectorViewDefinition, boolean replace) {
192+
return delegate.createView(catalogName, connectorViewDefinition, replace);
193193
}
194194

195195
@Override

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergApiConverter.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.starrocks.connector.iceberg;
1616

17+
import com.google.common.base.Strings;
1718
import com.google.common.collect.Lists;
1819
import com.starrocks.catalog.ArrayType;
1920
import com.starrocks.catalog.Column;
@@ -25,8 +26,11 @@
2526
import com.starrocks.catalog.StructField;
2627
import com.starrocks.catalog.StructType;
2728
import com.starrocks.catalog.Type;
29+
import com.starrocks.connector.ConnectorViewDefinition;
2830
import com.starrocks.connector.exception.StarRocksConnectorException;
2931
import com.starrocks.connector.hive.RemoteFileInputFormat;
32+
import com.starrocks.qe.ConnectContext;
33+
import com.starrocks.server.GlobalStateMgr;
3034
import com.starrocks.thrift.TIcebergColumnStats;
3135
import com.starrocks.thrift.TIcebergDataFile;
3236
import com.starrocks.thrift.TIcebergSchema;
@@ -425,4 +429,24 @@ public static List<StructField> getPartitionColumns(List<PartitionField> fields,
425429
public static Namespace convertDbNameToNamespace(String dbName) {
426430
return Namespace.of(dbName.split("\\."));
427431
}
432+
433+
public static Map<String, String> buildViewProperties(ConnectorViewDefinition definition, String catalogName) {
434+
ConnectContext connectContext = ConnectContext.get();
435+
if (connectContext == null) {
436+
throw new StarRocksConnectorException("not found connect context when building iceberg view properties");
437+
}
438+
439+
String queryId = connectContext.getQueryId().toString();
440+
441+
Map<String, String> properties = com.google.common.collect.ImmutableMap.of(
442+
"queryId", queryId,
443+
"starrocksCatalog", catalogName,
444+
"starrocksVersion", GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getFeVersion());
445+
446+
if (!Strings.isNullOrEmpty(definition.getComment())) {
447+
properties.put(IcebergMetadata.COMMENT, definition.getComment());
448+
}
449+
450+
return properties;
451+
}
428452
}

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.starrocks.connector.exception.StarRocksConnectorException;
2727
import com.starrocks.memory.MemoryTrackable;
2828
import org.apache.iceberg.FileScanTask;
29+
import org.apache.iceberg.MetadataTableType;
2930
import org.apache.iceberg.MetadataTableUtils;
3031
import org.apache.iceberg.PartitionSpec;
3132
import org.apache.iceberg.PartitionsTable;
@@ -35,10 +36,12 @@
3536
import org.apache.iceberg.Table;
3637
import org.apache.iceberg.TableScan;
3738
import org.apache.iceberg.catalog.Namespace;
39+
import org.apache.iceberg.catalog.TableIdentifier;
3840
import org.apache.iceberg.exceptions.NoSuchTableException;
3941
import org.apache.iceberg.io.CloseableIterable;
4042
import org.apache.iceberg.util.StructProjection;
4143
import org.apache.iceberg.view.View;
44+
import org.apache.iceberg.view.ViewBuilder;
4245
import org.apache.logging.log4j.LogManager;
4346
import org.apache.logging.log4j.Logger;
4447

@@ -49,6 +52,10 @@
4952
import java.util.Map;
5053
import java.util.concurrent.ExecutorService;
5154

55+
import static com.google.common.base.Preconditions.checkArgument;
56+
import static com.starrocks.connector.iceberg.IcebergApiConverter.buildViewProperties;
57+
import static com.starrocks.connector.iceberg.IcebergApiConverter.convertDbNameToNamespace;
58+
import static com.starrocks.connector.iceberg.IcebergMetadata.LOCATION_PROPERTY;
5259
import static org.apache.iceberg.StarRocksIcebergTableScan.newTableScanContext;
5360

5461
public interface IcebergCatalog extends MemoryTrackable {
@@ -99,7 +106,31 @@ default boolean tableExists(String dbName, String tableName) throws StarRocksCon
99106
}
100107
}
101108

102-
default boolean createView(ConnectorViewDefinition connectorViewDefinition, boolean replace) {
109+
default boolean createView(String catalogName, ConnectorViewDefinition connectorViewDefinition, boolean replace) {
110+
return createViewDefault(connectorViewDefinition.getDatabaseName(), connectorViewDefinition, replace);
111+
}
112+
113+
default boolean createViewDefault(String catalogName, ConnectorViewDefinition definition, boolean replace) {
114+
Schema schema = IcebergApiConverter.toIcebergApiSchema(definition.getColumns());
115+
Namespace ns = convertDbNameToNamespace(definition.getDatabaseName());
116+
ViewBuilder viewBuilder = getViewBuilder(TableIdentifier.of(ns, definition.getViewName()));
117+
viewBuilder = viewBuilder.withSchema(schema)
118+
.withQuery("starrocks", definition.getInlineViewDef())
119+
.withDefaultNamespace(ns)
120+
.withDefaultCatalog(definition.getCatalogName())
121+
.withProperties(buildViewProperties(definition, catalogName))
122+
.withLocation(defaultTableLocation(ns, definition.getViewName()));
123+
124+
if (replace) {
125+
viewBuilder.createOrReplace();
126+
} else {
127+
viewBuilder.create();
128+
}
129+
130+
return true;
131+
}
132+
133+
default ViewBuilder getViewBuilder(TableIdentifier identifier) {
103134
throw new StarRocksConnectorException("This catalog doesn't support creating views");
104135
}
105136

@@ -132,10 +163,18 @@ default StarRocksIcebergTableScan getTableScan(Table table, StarRocksIcebergTabl
132163
}
133164

134165
default String defaultTableLocation(Namespace ns, String tableName) {
135-
return "";
166+
Map<String, String> properties = loadNamespaceMetadata(ns);
167+
String databaseLocation = properties.get(LOCATION_PROPERTY);
168+
checkArgument(databaseLocation != null, "location must be set for %s.%s", ns, tableName);
169+
170+
if (databaseLocation.endsWith("/")) {
171+
return databaseLocation + tableName;
172+
} else {
173+
return databaseLocation + "/" + tableName;
174+
}
136175
}
137176

138-
default Map<String, Object> loadNamespaceMetadata(Namespace ns) {
177+
default Map<String, String> loadNamespaceMetadata(Namespace ns) {
139178
return new HashMap<>();
140179
}
141180

@@ -152,7 +191,7 @@ default Map<String, Partition> getPartitions(IcebergTable icebergTable, long sna
152191
Table nativeTable = icebergTable.getNativeTable();
153192
Map<String, Partition> partitionMap = Maps.newHashMap();
154193
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils.
155-
createMetadataTableInstance(nativeTable, org.apache.iceberg.MetadataTableType.PARTITIONS);
194+
createMetadataTableInstance(nativeTable, MetadataTableType.PARTITIONS);
156195
TableScan tableScan = partitionsTable.newScan();
157196
if (snapshotId != -1) {
158197
tableScan = tableScan.useSnapshot(snapshotId);

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public void createView(CreateViewStmt stmt) throws DdlException {
306306
}
307307

308308
ConnectorViewDefinition viewDefinition = ConnectorViewDefinition.fromCreateViewStmt(stmt);
309-
icebergCatalog.createView(viewDefinition, stmt.isReplace());
309+
icebergCatalog.createView(catalogName, viewDefinition, stmt.isReplace());
310310
}
311311

312312
@Override

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/hive/IcebergHiveCatalog.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.annotations.VisibleForTesting;
1818
import com.google.common.base.Preconditions;
1919
import com.google.common.base.Strings;
20+
import com.google.common.collect.ImmutableMap;
2021
import com.google.common.collect.Maps;
2122
import com.starrocks.catalog.Database;
2223
import com.starrocks.common.Config;
@@ -41,6 +42,8 @@
4142
import org.apache.iceberg.catalog.Namespace;
4243
import org.apache.iceberg.catalog.TableIdentifier;
4344
import org.apache.iceberg.hive.HiveCatalog;
45+
import org.apache.iceberg.view.View;
46+
import org.apache.iceberg.view.ViewBuilder;
4447
import org.apache.logging.log4j.LogManager;
4548
import org.apache.logging.log4j.Logger;
4649

@@ -52,6 +55,7 @@
5255
import java.util.stream.Collectors;
5356

5457
import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;
58+
import static com.starrocks.connector.iceberg.IcebergApiConverter.convertDbNameToNamespace;
5559
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.HIVE_METASTORE_TIMEOUT;
5660
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.HIVE_METASTORE_URIS;
5761
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.ICEBERG_METASTORE_URIS;
@@ -93,6 +97,12 @@ public IcebergHiveCatalog(String name, Configuration conf, Map<String, String> p
9397
delegate = (HiveCatalog) CatalogUtil.loadCatalog(HiveCatalog.class.getName(), name, copiedProperties, conf);
9498
}
9599

100+
@VisibleForTesting
101+
public IcebergHiveCatalog(HiveCatalog hiveCatalog, Configuration conf) {
102+
this.delegate = hiveCatalog;
103+
this.conf = conf;
104+
}
105+
96106
@Override
97107
public IcebergCatalogType getIcebergCatalogType() {
98108
return IcebergCatalogType.HIVE_CATALOG;
@@ -201,6 +211,26 @@ public void renameTable(String dbName, String tblName, String newTblName) throws
201211
delegate.renameTable(TableIdentifier.of(dbName, tblName), TableIdentifier.of(dbName, newTblName));
202212
}
203213

214+
@Override
215+
public ViewBuilder getViewBuilder(TableIdentifier identifier) {
216+
return delegate.buildView(identifier);
217+
}
218+
219+
@Override
220+
public boolean dropView(String dbName, String viewName) {
221+
return delegate.dropView(TableIdentifier.of(convertDbNameToNamespace(dbName), viewName));
222+
}
223+
224+
@Override
225+
public View getView(String dbName, String viewName) {
226+
return delegate.loadView(TableIdentifier.of(convertDbNameToNamespace(dbName), viewName));
227+
}
228+
229+
@Override
230+
public Map<String, String> loadNamespaceMetadata(Namespace ns) {
231+
return ImmutableMap.copyOf(delegate.loadNamespaceMetadata(ns));
232+
}
233+
204234
@Override
205235
public void deleteUncommittedDataFiles(List<String> fileLocations) {
206236
if (fileLocations.isEmpty()) {

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/rest/IcebergRESTCatalog.java

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.starrocks.common.MetaNotFoundException;
2222
import com.starrocks.connector.ConnectorViewDefinition;
2323
import com.starrocks.connector.exception.StarRocksConnectorException;
24-
import com.starrocks.connector.iceberg.IcebergApiConverter;
2524
import com.starrocks.connector.iceberg.IcebergCatalog;
2625
import com.starrocks.connector.iceberg.IcebergCatalogType;
2726
import com.starrocks.connector.iceberg.cost.IcebergMetricsReporter;
@@ -54,7 +53,6 @@
5453
import java.util.Map;
5554
import java.util.stream.Collectors;
5655

57-
import static com.google.common.base.Preconditions.checkArgument;
5856
import static com.starrocks.connector.ConnectorTableId.CONNECTOR_ID_GENERATOR;
5957
import static com.starrocks.connector.iceberg.IcebergApiConverter.convertDbNameToNamespace;
6058
import static com.starrocks.connector.iceberg.IcebergCatalogProperties.ICEBERG_CUSTOM_PROPERTIES_PREFIX;
@@ -226,24 +224,8 @@ public void renameTable(String dbName, String tblName, String newTblName) throws
226224
}
227225

228226
@Override
229-
public boolean createView(ConnectorViewDefinition definition, boolean replace) {
230-
Schema schema = IcebergApiConverter.toIcebergApiSchema(definition.getColumns());
231-
Namespace ns = convertDbNameToNamespace(definition.getDatabaseName());
232-
ViewBuilder viewBuilder = delegate.buildView(TableIdentifier.of(ns, definition.getViewName()));
233-
viewBuilder = viewBuilder.withSchema(schema)
234-
.withQuery("starrocks", definition.getInlineViewDef())
235-
.withDefaultNamespace(ns)
236-
.withDefaultCatalog(definition.getCatalogName())
237-
.withProperties(buildProperties(definition))
238-
.withLocation(defaultTableLocation(ns, definition.getViewName()));
239-
240-
if (replace) {
241-
viewBuilder.createOrReplace();
242-
} else {
243-
viewBuilder.create();
244-
}
245-
246-
return true;
227+
public ViewBuilder getViewBuilder(TableIdentifier identifier) {
228+
return delegate.buildView(identifier);
247229
}
248230

249231
@Override
@@ -279,20 +261,7 @@ public String toString() {
279261
}
280262

281263
@Override
282-
public String defaultTableLocation(Namespace ns, String tableName) {
283-
Map<String, String> properties = delegate.loadNamespaceMetadata(ns);
284-
String databaseLocation = properties.get(LOCATION_PROPERTY);
285-
checkArgument(databaseLocation != null, "location must be set for %s.%s", ns, tableName);
286-
287-
if (databaseLocation.endsWith("/")) {
288-
return databaseLocation + tableName;
289-
} else {
290-
return databaseLocation + "/" + tableName;
291-
}
292-
}
293-
294-
@Override
295-
public Map<String, Object> loadNamespaceMetadata(Namespace ns) {
264+
public Map<String, String> loadNamespaceMetadata(Namespace ns) {
296265
return ImmutableMap.copyOf(delegate.loadNamespaceMetadata(ns));
297266
}
298267

0 commit comments

Comments
 (0)