Skip to content

Commit 33518bb

Browse files
authored
[AMORO-3800] Unify the configuration name for AMS URI (#3801)
* Unify the configuration name for AMS URI * Rollback some changes * Fix a unit test error * Fix some issues in unit tests * Fix some checkstyle errors
1 parent 43c2ee1 commit 33518bb

File tree

48 files changed

+209
-335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+209
-335
lines changed

amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class AmsThriftUrl {
4040
public static final String PARAM_SOCKET_TIMEOUT = "socketTimeout";
4141
public static final int DEFAULT_SOCKET_TIMEOUT = 5000;
4242
public static final String ZOOKEEPER_FLAG = "zookeeper";
43-
public static final String THRIFT_FLAG = "thrift";
4443
public static final String THRIFT_URL_FORMAT = "thrift://%s:%d/%s%s";
4544
public static final int MAX_RETRIES = 3;
4645
private static final Logger logger = LoggerFactory.getLogger(AmsThriftUrl.class);

amoro-common/src/main/java/org/apache/amoro/properties/CatalogMetaProperties.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public class CatalogMetaProperties {
6969
public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT =
7070
TimeUnit.MINUTES.toMillis(5);
7171

72-
// only used for unified catalog
7372
public static final String AMS_URI = "ams.uri";
7473

7574
// only used for engine properties

amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Set;
4040

41-
/** Catalogs, create mixed-format catalog from metastore thrift url. */
41+
/** Catalogs, create mixed-format catalog from metastore thrift uri. */
4242
public class CatalogLoader {
4343

4444
public static final String INTERNAL_CATALOG_IMPL = InternalMixedIcebergCatalog.class.getName();
@@ -48,27 +48,27 @@ public class CatalogLoader {
4848
/**
4949
* Entrypoint for loading Catalog.
5050
*
51-
* @param catalogUrl mixed-format catalog url, thrift://ams-host:port/catalog_name
51+
* @param catalogUri mixed-format catalog uri, thrift://ams-host:port/catalog_name
5252
* @param properties client side catalog configs
5353
* @return mixed-format catalog object
5454
*/
55-
public static MixedFormatCatalog load(String catalogUrl, Map<String, String> properties) {
56-
AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl, Constants.THRIFT_TABLE_SERVICE_NAME);
55+
public static MixedFormatCatalog load(String catalogUri, Map<String, String> properties) {
56+
AmsThriftUrl url = AmsThriftUrl.parse(catalogUri, Constants.THRIFT_TABLE_SERVICE_NAME);
5757
if (url.catalogName() == null || url.catalogName().contains("/")) {
5858
throw new IllegalArgumentException("invalid catalog name " + url.catalogName());
5959
}
6060

61-
return loadCatalog(catalogUrl, url.catalogName(), properties);
61+
return loadCatalog(catalogUri, url.catalogName(), properties);
6262
}
6363

6464
/**
6565
* Entrypoint for loading catalog.
6666
*
67-
* @param catalogUrl mixed-format catalog url, thrift://ams-host:port/catalog_name
67+
* @param catalogUri mixed-format catalog uri, thrift://ams-host:port/catalog_name
6868
* @return mixed-format catalog object
6969
*/
70-
public static MixedFormatCatalog load(String catalogUrl) {
71-
return load(catalogUrl, Maps.newHashMap());
70+
public static MixedFormatCatalog load(String catalogUri) {
71+
return load(catalogUri, Maps.newHashMap());
7272
}
7373

7474
/**
@@ -106,15 +106,15 @@ private static String catalogImpl(String metastoreType, Map<String, String> cata
106106
/**
107107
* Load catalog meta from metastore.
108108
*
109-
* @param catalogUrl - catalog url
109+
* @param catalogUri - catalog uri
110110
* @return catalog meta
111111
*/
112-
public static CatalogMeta loadMeta(String catalogUrl) {
113-
AmsThriftUrl url = AmsThriftUrl.parse(catalogUrl, Constants.THRIFT_TABLE_SERVICE_NAME);
112+
public static CatalogMeta loadMeta(String catalogUri) {
113+
AmsThriftUrl url = AmsThriftUrl.parse(catalogUri, Constants.THRIFT_TABLE_SERVICE_NAME);
114114
if (url.catalogName() == null || url.catalogName().contains("/")) {
115115
throw new IllegalArgumentException("invalid catalog name " + url.catalogName());
116116
}
117-
AmsClient client = new PooledAmsClient(catalogUrl);
117+
AmsClient client = new PooledAmsClient(catalogUri);
118118
try {
119119
return client.getCatalog(url.catalogName());
120120
} catch (TException e) {
@@ -125,18 +125,18 @@ public static CatalogMeta loadMeta(String catalogUrl) {
125125
/**
126126
* Entrypoint for loading catalog
127127
*
128-
* @param metaStoreUrl mixed-format metastore url
128+
* @param metaStoreUri mixed-format metastore uri
129129
* @param catalogName mixed-format catalog name
130130
* @param properties client side catalog configs
131131
* @return mixed-format catalog object
132132
*/
133133
private static MixedFormatCatalog loadCatalog(
134-
String metaStoreUrl, String catalogName, Map<String, String> properties) {
135-
AmsClient client = new PooledAmsClient(metaStoreUrl);
134+
String metaStoreUri, String catalogName, Map<String, String> properties) {
135+
AmsClient client = new PooledAmsClient(metaStoreUri);
136136
try {
137137
CatalogMeta catalogMeta = client.getCatalog(catalogName);
138138
String type = catalogMeta.getCatalogType();
139-
catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, metaStoreUrl);
139+
catalogMeta.putToCatalogProperties(CatalogMetaProperties.AMS_URI, metaStoreUri);
140140
MixedFormatCatalogUtil.mergeCatalogProperties(catalogMeta, properties);
141141
return createCatalog(
142142
catalogName,

amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/CatalogTestBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,16 @@ public void dropCatalog() {
7474

7575
protected MixedFormatCatalog getMixedFormatCatalog() {
7676
if (mixedFormatCatalog == null) {
77-
mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
77+
mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
7878
}
7979
return mixedFormatCatalog;
8080
}
8181

8282
protected void refreshMixedFormatCatalog() {
83-
this.mixedFormatCatalog = CatalogLoader.load(getCatalogUrl());
83+
this.mixedFormatCatalog = CatalogLoader.load(getCatalogUri());
8484
}
8585

86-
protected String getCatalogUrl() {
86+
protected String getCatalogUri() {
8787
return TEST_AMS.getServerUrl() + "/" + catalogMeta.getCatalogName();
8888
}
8989

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/InternalCatalogBuilder.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@
5050
public class InternalCatalogBuilder implements Serializable {
5151
private static final Logger LOG = LoggerFactory.getLogger(InternalCatalogBuilder.class);
5252

53-
private String metastoreUrl;
53+
private String amsUri;
5454
private Map<String, String> properties = new HashMap<>(0);
5555
private String catalogName;
5656

5757
private MixedFormatCatalog createMixedFormatCatalog() {
58-
if (metastoreUrl != null) {
59-
return CatalogLoader.load(metastoreUrl, properties);
58+
if (amsUri != null) {
59+
return CatalogLoader.load(amsUri, properties);
6060
} else {
6161
Preconditions.checkArgument(catalogName != null, "Catalog name cannot be empty");
6262
String metastoreType = properties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
@@ -101,23 +101,25 @@ private static Configuration mergeHiveConf(
101101
}
102102

103103
if (!Strings.isNullOrEmpty(hadoopConfDir)) {
104+
java.nio.file.Path hdfsSiteFile = Paths.get(hadoopConfDir, "hdfs-site.xml");
104105
Preconditions.checkState(
105-
Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
106+
Files.exists(hdfsSiteFile),
106107
"Failed to load Hadoop configuration: missing %s",
107-
Paths.get(hadoopConfDir, "hdfs-site.xml"));
108+
hdfsSiteFile);
108109
newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
110+
java.nio.file.Path coreSiteFile = Paths.get(hadoopConfDir, "core-site.xml");
109111
Preconditions.checkState(
110-
Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
112+
Files.exists(coreSiteFile),
111113
"Failed to load Hadoop configuration: missing %s",
112-
Paths.get(hadoopConfDir, "core-site.xml"));
114+
coreSiteFile);
113115
newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
114116
}
115117

116118
return newConf;
117119
}
118120

119-
public String getMetastoreUrl() {
120-
return metastoreUrl;
121+
public String getAmsUri() {
122+
return amsUri;
121123
}
122124

123125
public Map<String, String> getProperties() {
@@ -134,8 +136,8 @@ public MixedFormatCatalog build() {
134136
return createMixedFormatCatalog();
135137
}
136138

137-
public InternalCatalogBuilder metastoreUrl(String metastoreUrl) {
138-
this.metastoreUrl = metastoreUrl;
139+
public InternalCatalogBuilder amsUri(String amsUri) {
140+
this.amsUri = amsUri;
139141
return this;
140142
}
141143

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/FlinkUnifiedCatalog.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
3232
import org.apache.amoro.flink.catalog.factories.FlinkUnifiedCatalogFactory;
3333
import org.apache.amoro.flink.catalog.factories.iceberg.IcebergFlinkCatalogFactory;
34-
import org.apache.amoro.flink.catalog.factories.mixed.MixedCatalogFactory;
34+
import org.apache.amoro.flink.catalog.factories.mixed.MixedHiveCatalogFactory;
35+
import org.apache.amoro.flink.catalog.factories.mixed.MixedIcebergCatalogFactory;
3536
import org.apache.amoro.flink.catalog.factories.paimon.PaimonFlinkCatalogFactory;
3637
import org.apache.amoro.flink.table.UnifiedDynamicTableFactory;
3738
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -493,8 +494,10 @@ private AmoroTable<?> loadAmoroTable(ObjectPath tablePath) {
493494
private AbstractCatalog createOriginalCatalog(
494495
TableIdentifier tableIdentifier, TableFormat tableFormat) {
495496
CatalogFactory catalogFactory;
496-
if (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
497-
catalogFactory = new MixedCatalogFactory();
497+
if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) {
498+
catalogFactory = new MixedIcebergCatalogFactory();
499+
} else if (tableFormat.equals(TableFormat.MIXED_HIVE)) {
500+
catalogFactory = new MixedHiveCatalogFactory();
498501
} else if (tableFormat.equals(TableFormat.ICEBERG)) {
499502
catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf);
500503
} else if (tableFormat.equals(TableFormat.PAIMON)) {

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/MixedCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ public class MixedCatalog extends AbstractCatalog {
102102
public static final String DEFAULT_DB = "default";
103103

104104
/**
105-
* To distinguish 'CREATE TABLE LIKE' by checking stack {@link
106-
* org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable}
105+
* To distinguish 'CREATE TABLE LIKE' by checking stack
106+
* org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable
107107
*/
108108
public static final String SQL_LIKE_METHOD = "lookupLikeSourceTable";
109109

@@ -242,7 +242,7 @@ private void fillTableMetaPropertiesIfLookupLike(
242242
properties.put(MixedFormatValidator.MIXED_FORMAT_CATALOG.key(), tableIdentifier.getCatalog());
243243
properties.put(MixedFormatValidator.MIXED_FORMAT_TABLE.key(), tableIdentifier.getTableName());
244244
properties.put(MixedFormatValidator.MIXED_FORMAT_DATABASE.key(), tableIdentifier.getDatabase());
245-
properties.put(CatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl());
245+
properties.put(CatalogFactoryOptions.AMS_URI.key(), catalogBuilder.getAmsUri());
246246
}
247247

248248
private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/CatalogFactoryOptions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.amoro.flink.catalog.FlinkUnifiedCatalog;
2424
import org.apache.amoro.flink.catalog.MixedCatalog;
25+
import org.apache.amoro.properties.CatalogMetaProperties;
2526
import org.apache.flink.annotation.Internal;
2627
import org.apache.flink.configuration.ConfigOption;
2728
import org.apache.flink.configuration.ConfigOptions;
@@ -31,11 +32,10 @@
3132
public class CatalogFactoryOptions {
3233
public static final String MIXED_ICEBERG_IDENTIFIER = "mixed_iceberg";
3334
public static final String MIXED_HIVE_IDENTIFIER = "mixed_hive";
34-
@Deprecated public static final String LEGACY_MIXED_IDENTIFIER = "arctic";
3535
public static final String UNIFIED_IDENTIFIER = "unified";
3636

37-
public static final ConfigOption<String> METASTORE_URL =
38-
ConfigOptions.key("metastore.url").stringType().noDefaultValue();
37+
public static final ConfigOption<String> AMS_URI =
38+
ConfigOptions.key(CatalogMetaProperties.AMS_URI).stringType().noDefaultValue();
3939

4040
public static final ConfigOption<String> FLINK_TABLE_FORMATS =
4141
ConfigOptions.key(TABLE_FORMATS)

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/FlinkUnifiedCatalogFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ public Catalog createCatalog(Context context) {
7777
context
7878
.getOptions()
7979
.getOrDefault(CommonCatalogOptions.DEFAULT_DATABASE_KEY, MixedCatalog.DEFAULT_DB);
80-
final String metastoreUrl = context.getOptions().get(CatalogFactoryOptions.METASTORE_URL.key());
80+
final String metastoreUri = context.getOptions().get(CatalogFactoryOptions.AMS_URI.key());
8181
final Map<String, String> catalogProperties = getCatalogProperties(context.getOptions());
8282

8383
UnifiedCatalog unifiedCatalog;
84-
if (metastoreUrl != null) {
84+
if (metastoreUri != null) {
8585
String amoroCatalogName =
86-
AmsThriftUrl.parse(metastoreUrl, THRIFT_TABLE_SERVICE_NAME).catalogName();
86+
AmsThriftUrl.parse(metastoreUri, THRIFT_TABLE_SERVICE_NAME).catalogName();
8787
unifiedCatalog =
8888
UnifiedCatalogLoader.loadUnifiedCatalog(
89-
metastoreUrl, amoroCatalogName, catalogProperties);
89+
metastoreUri, amoroCatalogName, catalogProperties);
9090
} else {
9191
String metastoreType = catalogProperties.get(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE);
9292
Preconditions.checkArgument(metastoreType != null, "Catalog type cannot be empty");
@@ -105,7 +105,7 @@ public Catalog createCatalog(Context context) {
105105
validate(tableFormats);
106106

107107
return new FlinkUnifiedCatalog(
108-
metastoreUrl, defaultDatabase, unifiedCatalog, context, hadoopConf);
108+
metastoreUri, defaultDatabase, unifiedCatalog, context, hadoopConf);
109109
}
110110

111111
private void validate(Set<TableFormat> expectedFormats) {

amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/catalog/factories/mixed/MixedCatalogFactory.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)