Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public class IcebergJdbcMetaStoreProperties extends AbstractIcebergProperties {
)
private String jdbcStrictMode;

@ConnectorProperty(
names = {"iceberg.jdbc.catalog_name"},
required = true,
description = "The Iceberg JDBC catalog_name used to isolate metadata in JDBC catalog tables."
)
private String jdbcCatalogName;

@ConnectorProperty(
names = {"iceberg.jdbc.driver_url"},
required = false,
Expand Down Expand Up @@ -142,7 +149,8 @@ public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
registerJdbcDriver(driverUrl, driverClass);
LOG.info("Using dynamic JDBC driver from: {}", driverUrl);
}
return buildIcebergCatalog(catalogName, catalogProps, configuration);
catalogProps.remove("iceberg.jdbc.catalog_name");
return buildIcebergCatalog(jdbcCatalogName, catalogProps, configuration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,52 @@

package org.apache.doris.datasource.property.metastore;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class IcebergJdbcMetaStorePropertiesTest {

private static class CapturingIcebergJdbcMetaStoreProperties extends IcebergJdbcMetaStoreProperties {
private String capturedCatalogName;
private Map<String, String> capturedOptions;

CapturingIcebergJdbcMetaStoreProperties(Map<String, String> props) {
super(props);
}

@Override
protected Catalog buildIcebergCatalog(String catalogName, Map<String, String> options, Configuration conf) {
capturedCatalogName = catalogName;
capturedOptions = new HashMap<>(options);
return Mockito.mock(Catalog.class);
}

String getCapturedCatalogName() {
return capturedCatalogName;
}

Map<String, String> getCapturedOptions() {
return capturedOptions;
}
}

@Test
public void testBasicJdbcProperties() {
Map<String, String> props = new HashMap<>();
props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
props.put("warehouse", "s3://warehouse/path");
props.put("jdbc.user", "iceberg");
props.put("jdbc.password", "secret");
props.put("iceberg.jdbc.catalog_name", "iceberg_catalog");

IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props);
jdbcProps.initNormalizeAndCheckProps();
Expand All @@ -53,6 +82,7 @@ public void testJdbcPrefixPassthrough() {
props.put("warehouse", "s3://warehouse/path");
props.put("jdbc.useSSL", "true");
props.put("jdbc.verifyServerCertificate", "true");
props.put("iceberg.jdbc.catalog_name", "iceberg_catalog");

IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props);
jdbcProps.initNormalizeAndCheckProps();
Expand All @@ -79,4 +109,46 @@ public void testMissingUri() {
IcebergJdbcMetaStoreProperties jdbcProps = new IcebergJdbcMetaStoreProperties(props);
Assertions.assertThrows(IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps);
}

@Test
public void testJdbcCatalogNameOverridesSdkCatalogName() {
Map<String, String> props = createBaseProps();
props.put("iceberg.jdbc.catalog_name", "spark_catalog");

CapturingIcebergJdbcMetaStoreProperties jdbcProps = new CapturingIcebergJdbcMetaStoreProperties(props);
jdbcProps.initNormalizeAndCheckProps();
jdbcProps.initializeCatalog("doris_catalog", Collections.emptyList());

Assertions.assertEquals("spark_catalog", jdbcProps.getCapturedCatalogName());
Assertions.assertFalse(jdbcProps.getCapturedOptions().containsKey("iceberg.jdbc.catalog_name"));
}

@Test
public void testMissingJdbcCatalogNameThrowsException() {
CapturingIcebergJdbcMetaStoreProperties jdbcProps =
new CapturingIcebergJdbcMetaStoreProperties(createBaseProps());

IllegalArgumentException exception = Assertions.assertThrows(
IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps);
Assertions.assertEquals("Property iceberg.jdbc.catalog_name is required.", exception.getMessage());
}

@Test
public void testBlankJdbcCatalogNameThrowsException() {
Map<String, String> props = createBaseProps();
props.put("iceberg.jdbc.catalog_name", " ");

CapturingIcebergJdbcMetaStoreProperties jdbcProps = new CapturingIcebergJdbcMetaStoreProperties(props);

IllegalArgumentException exception = Assertions.assertThrows(
IllegalArgumentException.class, jdbcProps::initNormalizeAndCheckProps);
Assertions.assertEquals("Property iceberg.jdbc.catalog_name is required.", exception.getMessage());
}

private static Map<String, String> createBaseProps() {
Map<String, String> props = new HashMap<>();
props.put("uri", "jdbc:mysql://localhost:3306/iceberg");
props.put("warehouse", "s3://warehouse/path");
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external
return;
}

String catalog_name = "test_iceberg_jdbc_catalog"
String catalog_name = "test_iceberg_jdbc_catalog_doris"
String iceberg_jdbc_catalog_name = "test_iceberg_jdbc_catalog"
String db_name = "jdbc_test_db"
String driver_name = "postgresql-42.5.0.jar"
String driver_download_url = "${getS3Url()}/regression/jdbc_driver/${driver_name}"
Expand Down Expand Up @@ -116,6 +117,7 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external
'iceberg.jdbc.driver_class' = 'org.postgresql.Driver',
'iceberg.jdbc.user' = 'postgres',
'iceberg.jdbc.password' = '123456',
'iceberg.jdbc.catalog_name' = '${iceberg_jdbc_catalog_name}',
'iceberg.jdbc.init-catalog-tables' = 'true',
'iceberg.jdbc.schema-version' = 'V1',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
Expand Down Expand Up @@ -237,7 +239,8 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external
String cleanupCmd = "mysql -h ${externalEnvIp} -P ${mysql_port} -u root -p123456 -e 'DROP DATABASE IF EXISTS iceberg_db; CREATE DATABASE iceberg_db;'"
executeCommand(cleanupCmd, false)

String mysql_catalog_name = "iceberg_jdbc_mysql"
String mysql_catalog_name = "iceberg_jdbc_mysql_doris"
String iceberg_jdbc_mysql_catalog_name = "iceberg_jdbc_mysql"
try {
sql """DROP CATALOG IF EXISTS ${mysql_catalog_name}"""
sql """
Expand All @@ -250,6 +253,7 @@ suite("test_iceberg_jdbc_catalog", "p0,external,iceberg,external_docker,external
'iceberg.jdbc.driver_class' = 'com.mysql.jdbc.Driver',
'iceberg.jdbc.user' = 'root',
'iceberg.jdbc.password' = '123456',
'iceberg.jdbc.catalog_name' = '${iceberg_jdbc_mysql_catalog_name}',
'iceberg.jdbc.init-catalog-tables' = 'true',
'iceberg.jdbc.schema-version' = 'V1',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
Expand Down
Loading