Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;

Expand Down Expand Up @@ -90,41 +91,56 @@ public void executeScript(String scriptFilePath) throws SQLException {
}

/**
* Executes SELECT Query
* Executes SELECT Query and returns the results after applying a transformer
*
* @param query : Query to executed
* @param entityClass : Class of the entity being selected
* @param transformer : Transformation of entity class to Result class
* @param entityFilter : Filter to applied on the Result class
* @param limit : Limit to to enforced.
* @return List of Result class objects
* @param <T> : Entity class
* @param <R> : Result class
* @param converterInstance : An instance of the type being selected, used to convert to a
* business entity like PolarisBaseEntity
* @param transformer Transformation of entity class to Result class
* @return The list of results yielded by the query
* @param <T> : Persistence entity class
* @param <R> : Business entity class
* @throws SQLException : Exception during the query execution.
*/
public <T, R> List<R> executeSelect(
@Nonnull String query,
@Nonnull Class<T> entityClass,
@Nonnull Function<T, R> transformer,
Predicate<R> entityFilter,
int limit)
@Nonnull Converter<T> converterInstance,
@Nonnull Function<T, R> transformer)
Comment on lines +107 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we could construct a "result entity" directly from the ResultSet. There is no transformer is required, and converter will need to take one more type mark like Converter<T, R>. Not a blocker though. We could make that change later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I think we'd have to refactor Converter a little bit so that we could just call converterInstance.convert here or something. Might leave it for now since the goal here is just to pull the logic out of executeSelectOverStream

throws SQLException {
ArrayList<R> results = new ArrayList<>();
executeSelectOverStream(
query, converterInstance, stream -> stream.map(transformer).forEach(results::add));
return results;
}

/**
* Executes SELECT Query and takes a consumer over the results. For callers that want more
* sophisticated control over how query results are handled.
*
* @param query : Query to executed
* @param converterInstance : An entity of the type being selected
* @param consumer : An function to consume the returned results
* @param <T> : Entity class
* @throws SQLException : Exception during the query execution.
*/
public <T> void executeSelectOverStream(
@Nonnull String query,
@Nonnull Converter<T> converterInstance,
@Nonnull Consumer<Stream<T>> consumer)
throws SQLException {
try (Connection connection = borrowConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) {
List<R> resultList = new ArrayList<>();
while (resultSet.next() && resultList.size() < limit) {
Converter<T> object =
(Converter<T>)
entityClass.getDeclaredConstructor().newInstance(); // Create a new instance
R entity = transformer.apply(object.fromResultSet(resultSet));
if (entityFilter == null || entityFilter.test(entity)) {
resultList.add(entity);
}
}
return resultList;
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance);
consumer.accept(iterator.toStream());
} catch (SQLException e) {
throw e;
} catch (RuntimeException e) {
if (e.getCause() instanceof SQLException) {
throw (SQLException) e.getCause();
} else {
throw e;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ public PolarisBaseEntity lookupEntityByName(
private PolarisBaseEntity getPolarisBaseEntity(String query) {
try {
List<PolarisBaseEntity> results =
datasourceOperations.executeSelect(
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE);
datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity);
if (results.isEmpty()) {
return null;
} else if (results.size() > 1) {
Expand All @@ -322,8 +321,7 @@ public List<PolarisBaseEntity> lookupEntities(
if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();
String query = generateSelectQueryWithEntityIds(realmId, entityIds);
try {
return datasourceOperations.executeSelect(
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE);
return datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
Expand Down Expand Up @@ -412,9 +410,17 @@ public <T> List<T> listEntities(
// absence of transaction.
String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params);
try {
List<PolarisBaseEntity> results =
datasourceOperations.executeSelect(
query, ModelEntity.class, ModelEntity::toEntity, entityFilter, limit);
List<PolarisBaseEntity> results = new ArrayList<>();
datasourceOperations.executeSelectOverStream(
query,
new ModelEntity(),
stream -> {
stream
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(limit)
.forEach(results::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to do toList() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the loan pattern, we aren't returning anything so we need to get the list as a side effect. Unfortunately, there is a restriction in Java that variables pulled in to the scope of a lambda must be "effectively final", so we can't do like results = ...toList() in the lambda

});
return results == null
? Collections.emptyList()
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
Expand Down Expand Up @@ -461,11 +467,7 @@ public PolarisGrantRecord lookupGrantRecord(
try {
List<PolarisGrantRecord> results =
datasourceOperations.executeSelect(
query,
ModelGrantRecord.class,
ModelGrantRecord::toGrantRecord,
null,
Integer.MAX_VALUE);
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
if (results.size() > 1) {
throw new IllegalStateException(
String.format(
Expand Down Expand Up @@ -496,11 +498,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnSecurable(
try {
List<PolarisGrantRecord> results =
datasourceOperations.executeSelect(
query,
ModelGrantRecord.class,
ModelGrantRecord::toGrantRecord,
null,
Integer.MAX_VALUE);
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
Expand All @@ -522,11 +520,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnGrantee(
try {
List<PolarisGrantRecord> results =
datasourceOperations.executeSelect(
query,
ModelGrantRecord.class,
ModelGrantRecord::toGrantRecord,
null,
Integer.MAX_VALUE);
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord);
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
Expand All @@ -553,8 +547,7 @@ public boolean hasChildren(
String query = generateSelectQuery(new ModelEntity(), params);
try {
List<ModelEntity> results =
datasourceOperations.executeSelect(
query, ModelEntity.class, Function.identity(), null, Integer.MAX_VALUE);
datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity());
return results != null && !results.isEmpty();
} catch (SQLException e) {
throw new RuntimeException(
Expand All @@ -574,10 +567,8 @@ public PolarisPrincipalSecrets loadPrincipalSecrets(
List<PolarisPrincipalSecrets> results =
datasourceOperations.executeSelect(
query,
ModelPrincipalAuthenticationData.class,
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData,
null,
Integer.MAX_VALUE);
new ModelPrincipalAuthenticationData(),
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData);
return results == null || results.isEmpty() ? null : results.getFirst();
} catch (SQLException e) {
LOGGER.error(
Expand Down Expand Up @@ -880,10 +871,8 @@ private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(String query)
List<PolarisPolicyMappingRecord> results =
datasourceOperations.executeSelect(
query,
ModelPolicyMappingRecord.class,
ModelPolicyMappingRecord::toPolicyMappingRecord,
null,
Integer.MAX_VALUE);
new ModelPolicyMappingRecord(),
ModelPolicyMappingRecord::toPolicyMappingRecord);
return results == null ? Collections.emptyList() : results;
} catch (SQLException e) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.extension.persistence.relational.jdbc;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter;

/**
* Used to wrap a ResultSet and to build a stream from the data it contains. This data structure
* will not close the ResultSet passed in, so the caller is still responsible for managing its
* lifecycle
*/
public class ResultSetIterator<T> implements Iterator<T> {
private final ResultSet resultSet;
private final Converter<T> converterInstance;
private boolean hasNext;

public ResultSetIterator(ResultSet resultSet, Converter<T> converterInstance)
throws SQLException {
this.resultSet = resultSet;
this.converterInstance = converterInstance;
advance();
}

private void advance() throws SQLException {
try {
hasNext = resultSet.next();
} catch (SQLException e) {
hasNext = false;
throw e;
}
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public T next() {
if (!hasNext) {
throw new NoSuchElementException();
}
try {
T object = converterInstance.fromResultSet(resultSet);
advance();
return object;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public Stream<T> toStream() {
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this, 0);
return StreamSupport.stream(spliterator, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Function;
import javax.sql.DataSource;
import org.apache.polaris.extension.persistence.relational.jdbc.DatasourceOperations;
import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -76,9 +77,7 @@ void testExecuteSelect_exception() throws Exception {

assertThrows(
SQLException.class,
() ->
datasourceOperations.executeSelect(
query, Object.class, Function.identity(), null, Integer.MAX_VALUE));
() -> datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity()));
}

@Test
Expand Down