Skip to content

Commit 0c48970

Browse files
authored
[JDBC] Part3: Plumb JDBC module to Quarkus (#1371)
1 parent 2d564f9 commit 0c48970

File tree

28 files changed

+572
-129
lines changed

28 files changed

+572
-129
lines changed

bom/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies {
4646
api(project(":polaris-jpa-model"))
4747

4848
api(project(":polaris-quarkus-admin"))
49+
api(project(":polaris-quarkus-test-commons"))
4950
api(project(":polaris-quarkus-defaults"))
5051
api(project(":polaris-quarkus-server"))
5152
api(project(":polaris-quarkus-service"))

extension/persistence/relational-jdbc/build.gradle.kts

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
* under the License.
1818
*/
1919

20-
plugins { id("polaris-server") }
20+
plugins {
21+
id("polaris-server")
22+
alias(libs.plugins.jandex)
23+
}
2124

2225
dependencies {
2326
implementation(project(":polaris-core"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.extension.persistence.relational.jdbc;
20+
21+
import java.util.Locale;
22+
23+
public enum DatabaseType {
24+
POSTGRES("postgres"),
25+
H2("h2");
26+
27+
private final String displayName; // Store the user-friendly name
28+
29+
DatabaseType(String displayName) {
30+
this.displayName = displayName;
31+
}
32+
33+
// Method to get the user-friendly display name
34+
public String getDisplayName() {
35+
return displayName;
36+
}
37+
38+
public static DatabaseType fromDisplayName(String displayName) {
39+
return switch (displayName.toLowerCase(Locale.ROOT)) {
40+
case "h2" -> DatabaseType.H2;
41+
case "postgresql" -> DatabaseType.POSTGRES;
42+
default -> throw new IllegalStateException("Unsupported DatabaseType: '" + displayName + "'");
43+
};
44+
}
45+
46+
public String getInitScriptResource() {
47+
return String.format("%s/schema-v1.sql", this.getDisplayName());
48+
}
49+
}

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java

-8
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,9 @@
3535
import java.util.function.Predicate;
3636
import javax.sql.DataSource;
3737
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;
38-
import org.slf4j.Logger;
39-
import org.slf4j.LoggerFactory;
4038

4139
public class DatasourceOperations {
42-
private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class);
4340

44-
private static final String ALREADY_EXISTS_STATE_POSTGRES = "42P07";
4541
private static final String CONSTRAINT_VIOLATION_SQL_CODE = "23505";
4642

4743
private final DataSource datasource;
@@ -189,10 +185,6 @@ public boolean isConstraintViolation(SQLException e) {
189185
return CONSTRAINT_VIOLATION_SQL_CODE.equals(e.getSQLState());
190186
}
191187

192-
public boolean isAlreadyExistsException(SQLException e) {
193-
return ALREADY_EXISTS_STATE_POSTGRES.equals(e.getSQLState());
194-
}
195-
196188
private Connection borrowConnection() throws SQLException {
197189
return datasource.getConnection();
198190
}

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

+65-91
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import jakarta.annotation.Nonnull;
2424
import jakarta.annotation.Nullable;
2525
import java.sql.SQLException;
26+
import java.sql.Statement;
2627
import java.util.ArrayList;
2728
import java.util.Collections;
2829
import java.util.HashMap;
@@ -86,44 +87,14 @@ public void writeEntity(
8687
@Nonnull PolarisBaseEntity entity,
8788
boolean nameOrParentChanged,
8889
PolarisBaseEntity originalEntity) {
89-
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
90-
String query;
91-
if (originalEntity == null) {
92-
try {
93-
query = generateInsertQuery(modelEntity, realmId);
94-
datasourceOperations.executeUpdate(query);
95-
} catch (SQLException e) {
96-
if ((datasourceOperations.isConstraintViolation(e)
97-
|| datasourceOperations.isAlreadyExistsException(e))) {
98-
throw new EntityAlreadyExistsException(entity, e);
99-
} else {
100-
throw new RuntimeException(
101-
String.format("Failed to write entity due to %s", e.getMessage()), e);
102-
}
103-
}
104-
} else {
105-
Map<String, Object> params =
106-
Map.of(
107-
"id",
108-
originalEntity.getId(),
109-
"catalog_id",
110-
originalEntity.getCatalogId(),
111-
"entity_version",
112-
originalEntity.getEntityVersion(),
113-
"realm_id",
114-
realmId);
115-
query = generateUpdateQuery(modelEntity, params);
116-
try {
117-
int rowsUpdated = datasourceOperations.executeUpdate(query);
118-
if (rowsUpdated == 0) {
119-
throw new RetryOnConcurrencyException(
120-
"Entity '%s' id '%s' concurrently modified; expected version %s",
121-
entity.getName(), entity.getId(), originalEntity.getEntityVersion());
122-
}
123-
} catch (SQLException e) {
124-
throw new RuntimeException(
125-
String.format("Failed to write entity due to %s", e.getMessage()), e);
126-
}
90+
try {
91+
datasourceOperations.runWithinTransaction(
92+
statement -> {
93+
persistEntity(callCtx, entity, originalEntity, statement);
94+
return true;
95+
});
96+
} catch (SQLException e) {
97+
throw new RuntimeException("Error persisting entity", e);
12798
}
12899
}
129100

@@ -137,70 +108,21 @@ public void writeEntities(
137108
statement -> {
138109
for (int i = 0; i < entities.size(); i++) {
139110
PolarisBaseEntity entity = entities.get(i);
140-
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
111+
PolarisBaseEntity originalEntity =
112+
originalEntities != null ? originalEntities.get(i) : null;
141113

142114
// first, check if the entity has already been created, in which case we will simply
143115
// return it.
144116
PolarisBaseEntity entityFound =
145117
lookupEntity(
146118
callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode());
147-
if (entityFound != null) {
119+
if (entityFound != null && originalEntity == null) {
148120
// probably the client retried, simply return it
149121
// TODO: Check correctness of returning entityFound vs entity here. It may have
150122
// already been updated after the creation.
151123
continue;
152124
}
153-
// lookup by name
154-
EntityNameLookupRecord exists =
155-
lookupEntityIdAndSubTypeByName(
156-
callCtx,
157-
entity.getCatalogId(),
158-
entity.getParentId(),
159-
entity.getTypeCode(),
160-
entity.getName());
161-
if (exists != null) {
162-
throw new EntityAlreadyExistsException(entity);
163-
}
164-
String query;
165-
if (originalEntities == null || originalEntities.get(i) == null) {
166-
try {
167-
query = generateInsertQuery(modelEntity, realmId);
168-
statement.executeUpdate(query);
169-
} catch (SQLException e) {
170-
if ((datasourceOperations.isConstraintViolation(e)
171-
|| datasourceOperations.isAlreadyExistsException(e))) {
172-
throw new EntityAlreadyExistsException(entity, e);
173-
} else {
174-
throw new RuntimeException(
175-
String.format("Failed to write entity due to %s", e.getMessage()), e);
176-
}
177-
}
178-
} else {
179-
Map<String, Object> params =
180-
Map.of(
181-
"id",
182-
originalEntities.get(i).getId(),
183-
"catalog_id",
184-
originalEntities.get(i).getCatalogId(),
185-
"entity_version",
186-
originalEntities.get(i).getEntityVersion(),
187-
"realm_id",
188-
realmId);
189-
query = generateUpdateQuery(modelEntity, params);
190-
try {
191-
int rowsUpdated = statement.executeUpdate(query);
192-
if (rowsUpdated == 0) {
193-
throw new RetryOnConcurrencyException(
194-
"Entity '%s' id '%s' concurrently modified; expected version %s",
195-
entity.getName(),
196-
entity.getId(),
197-
originalEntities.get(i).getEntityVersion());
198-
}
199-
} catch (SQLException e) {
200-
throw new RuntimeException(
201-
String.format("Failed to write entity due to %s", e.getMessage()), e);
202-
}
203-
}
125+
persistEntity(callCtx, entity, originalEntity, statement);
204126
}
205127
return true;
206128
});
@@ -212,6 +134,56 @@ public void writeEntities(
212134
}
213135
}
214136

137+
private void persistEntity(
138+
@Nonnull PolarisCallContext callCtx,
139+
@Nonnull PolarisBaseEntity entity,
140+
PolarisBaseEntity originalEntity,
141+
Statement statement)
142+
throws SQLException {
143+
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
144+
if (originalEntity == null) {
145+
try {
146+
statement.executeUpdate(generateInsertQuery(modelEntity, realmId));
147+
} catch (SQLException e) {
148+
if (datasourceOperations.isConstraintViolation(e)) {
149+
PolarisBaseEntity existingEntity =
150+
lookupEntityByName(
151+
callCtx,
152+
entity.getCatalogId(),
153+
entity.getParentId(),
154+
entity.getTypeCode(),
155+
entity.getName());
156+
throw new EntityAlreadyExistsException(existingEntity, e);
157+
} else {
158+
throw new RuntimeException(
159+
String.format("Failed to write entity due to %s", e.getMessage()), e);
160+
}
161+
}
162+
} else {
163+
Map<String, Object> params =
164+
Map.of(
165+
"id",
166+
originalEntity.getId(),
167+
"catalog_id",
168+
originalEntity.getCatalogId(),
169+
"entity_version",
170+
originalEntity.getEntityVersion(),
171+
"realm_id",
172+
realmId);
173+
try {
174+
int rowsUpdated = statement.executeUpdate(generateUpdateQuery(modelEntity, params));
175+
if (rowsUpdated == 0) {
176+
throw new RetryOnConcurrencyException(
177+
"Entity '%s' id '%s' concurrently modified; expected version %s",
178+
originalEntity.getName(), originalEntity.getId(), originalEntity.getEntityVersion());
179+
}
180+
} catch (SQLException e) {
181+
throw new RuntimeException(
182+
String.format("Failed to write entity due to %s", e.getMessage()), e);
183+
}
184+
}
185+
}
186+
215187
@Override
216188
public void writeToGrantRecords(
217189
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord grantRec) {
@@ -492,6 +464,8 @@ public PolarisGrantRecord lookupGrantRecord(
492464
throw new IllegalStateException(
493465
String.format(
494466
"More than one grant record %s for a given Grant record", results.getFirst()));
467+
} else if (results.isEmpty()) {
468+
return null;
495469
}
496470
return results.getFirst();
497471
} catch (SQLException e) {

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import io.smallrye.common.annotation.Identifier;
2222
import jakarta.annotation.Nullable;
2323
import jakarta.enterprise.context.ApplicationScoped;
24+
import jakarta.enterprise.inject.Instance;
2425
import jakarta.inject.Inject;
26+
import java.sql.Connection;
2527
import java.sql.SQLException;
2628
import java.util.HashMap;
2729
import java.util.Map;
@@ -37,6 +39,7 @@
3739
import org.apache.polaris.core.entity.PolarisEntitySubType;
3840
import org.apache.polaris.core.entity.PolarisEntityType;
3941
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
42+
import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager;
4043
import org.apache.polaris.core.persistence.BasePersistence;
4144
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
4245
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
@@ -46,7 +49,6 @@
4649
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
4750
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
4851
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
49-
import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
5052
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
5153
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
5254
import org.slf4j.Logger;
@@ -68,10 +70,9 @@ public class JdbcMetaStoreManagerFactory implements MetaStoreManagerFactory {
6870
final Map<String, EntityCache> entityCacheMap = new HashMap<>();
6971
final Map<String, Supplier<BasePersistence>> sessionSupplierMap = new HashMap<>();
7072
protected final PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
71-
// TODO: Pending discussion of if we should have one Database per realm or 1 schema per realm
72-
// or realm should be a primary key on all the tables.
73-
@Inject DataSource dataSource;
73+
7474
@Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
75+
@Inject Instance<DataSource> dataSource;
7576

7677
protected JdbcMetaStoreManagerFactory() {}
7778

@@ -86,7 +87,7 @@ protected PrincipalSecretsGenerator secretsGenerator(
8687
}
8788

8889
protected PolarisMetaStoreManager createNewMetaStoreManager() {
89-
return new TransactionalMetaStoreManagerImpl();
90+
return new AtomicOperationMetaStoreManager();
9091
}
9192

9293
private void initializeForRealm(
@@ -106,12 +107,16 @@ private void initializeForRealm(
106107
}
107108

108109
private DatasourceOperations getDatasourceOperations(boolean isBootstrap) {
109-
DatasourceOperations databaseOperations = new DatasourceOperations(dataSource);
110+
DatasourceOperations databaseOperations = new DatasourceOperations(dataSource.get());
110111
if (isBootstrap) {
111-
// TODO: see if we need to take script from Quarkus or can we just
112-
// use the script committed in the repo.
113112
try {
114-
databaseOperations.executeScript("scripts/postgres/schema-v1-postgres.sql");
113+
DatabaseType databaseType;
114+
try (Connection connection = dataSource.get().getConnection()) {
115+
String productName = connection.getMetaData().getDatabaseProductName();
116+
databaseType = DatabaseType.fromDisplayName(productName);
117+
}
118+
databaseOperations.executeScript(
119+
String.format("%s/schema-v1.sql", databaseType.getDisplayName()));
115120
} catch (SQLException e) {
116121
throw new RuntimeException(
117122
String.format("Error executing sql script: %s", e.getMessage()), e);

extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/models/ModelEntity.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,11 @@ public ModelEntity fromResultSet(ResultSet r) throws SQLException {
152152
.purgeTimestamp(r.getObject("purge_timestamp", Long.class))
153153
.toPurgeTimestamp(r.getObject("to_purge_timestamp", Long.class))
154154
.lastUpdateTimestamp(r.getObject("last_update_timestamp", Long.class))
155-
.properties(r.getObject("properties", String.class))
156-
.internalProperties(r.getObject("internal_properties", String.class))
155+
.properties(
156+
r.getString("properties")) // required for extracting when the underlying type is JSONB
157+
.internalProperties(
158+
r.getString(
159+
"internal_properties")) // required for extracting when the underlying type is JSONB
157160
.grantRecordsVersion(r.getObject("grant_records_version", Integer.class))
158161
.build();
159162
}

0 commit comments

Comments
 (0)