-
Notifications
You must be signed in to change notification settings - Fork 305
Lazy iteration over JDBC ResultSet #1487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86acd38
d8d4ce2
6ac54f1
f421b8c
32a3b63
4e92053
734428d
7b744ef
5a81a9c
3525af3
1987821
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,8 +31,9 @@ | |
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
import javax.sql.DataSource; | ||
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; | ||
|
||
|
@@ -90,41 +91,56 @@ public void executeScript(String scriptFilePath) throws SQLException { | |
} | ||
|
||
/** | ||
* Executes SELECT Query | ||
* Executes SELECT Query and returns the results after applying a transformer | ||
* | ||
* @param query : Query to executed | ||
* @param entityClass : Class of the entity being selected | ||
* @param transformer : Transformation of entity class to Result class | ||
* @param entityFilter : Filter to applied on the Result class | ||
* @param limit : Limit to to enforced. | ||
* @return List of Result class objects | ||
* @param <T> : Entity class | ||
* @param <R> : Result class | ||
* @param converterInstance : An instance of the type being selected, used to convert to a | ||
* business entity like PolarisBaseEntity | ||
* @param transformer Transformation of entity class to Result class | ||
* @return The list of results yielded by the query | ||
* @param <T> : Persistence entity class | ||
* @param <R> : Business entity class | ||
* @throws SQLException : Exception during the query execution. | ||
*/ | ||
public <T, R> List<R> executeSelect( | ||
@Nonnull String query, | ||
@Nonnull Class<T> entityClass, | ||
@Nonnull Function<T, R> transformer, | ||
Predicate<R> entityFilter, | ||
int limit) | ||
@Nonnull Converter<T> converterInstance, | ||
@Nonnull Function<T, R> transformer) | ||
Comment on lines
+107
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that we could construct a "result entity" directly from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good idea, I think we'd have to refactor |
||
throws SQLException { | ||
ArrayList<R> results = new ArrayList<>(); | ||
executeSelectOverStream( | ||
query, converterInstance, stream -> stream.map(transformer).forEach(results::add)); | ||
return results; | ||
} | ||
|
||
/** | ||
* Executes SELECT Query and takes a consumer over the results. For callers that want more | ||
* sophisticated control over how query results are handled. | ||
* | ||
* @param query : Query to executed | ||
* @param converterInstance : An entity of the type being selected | ||
* @param consumer : An function to consume the returned results | ||
* @param <T> : Entity class | ||
* @throws SQLException : Exception during the query execution. | ||
*/ | ||
public <T> void executeSelectOverStream( | ||
@Nonnull String query, | ||
@Nonnull Converter<T> converterInstance, | ||
@Nonnull Consumer<Stream<T>> consumer) | ||
throws SQLException { | ||
try (Connection connection = borrowConnection(); | ||
Statement statement = connection.createStatement(); | ||
ResultSet resultSet = statement.executeQuery(query)) { | ||
List<R> resultList = new ArrayList<>(); | ||
while (resultSet.next() && resultList.size() < limit) { | ||
Converter<T> object = | ||
(Converter<T>) | ||
entityClass.getDeclaredConstructor().newInstance(); // Create a new instance | ||
R entity = transformer.apply(object.fromResultSet(resultSet)); | ||
if (entityFilter == null || entityFilter.test(entity)) { | ||
resultList.add(entity); | ||
} | ||
} | ||
return resultList; | ||
ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, converterInstance); | ||
consumer.accept(iterator.toStream()); | ||
} catch (SQLException e) { | ||
throw e; | ||
} catch (RuntimeException e) { | ||
if (e.getCause() instanceof SQLException) { | ||
throw (SQLException) e.getCause(); | ||
} else { | ||
throw e; | ||
} | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,8 +297,7 @@ public PolarisBaseEntity lookupEntityByName( | |
private PolarisBaseEntity getPolarisBaseEntity(String query) { | ||
try { | ||
List<PolarisBaseEntity> results = | ||
datasourceOperations.executeSelect( | ||
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE); | ||
datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity); | ||
if (results.isEmpty()) { | ||
return null; | ||
} else if (results.size() > 1) { | ||
|
@@ -322,8 +321,7 @@ public List<PolarisBaseEntity> lookupEntities( | |
if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>(); | ||
String query = generateSelectQueryWithEntityIds(realmId, entityIds); | ||
try { | ||
return datasourceOperations.executeSelect( | ||
query, ModelEntity.class, ModelEntity::toEntity, null, Integer.MAX_VALUE); | ||
return datasourceOperations.executeSelect(query, new ModelEntity(), ModelEntity::toEntity); | ||
} catch (SQLException e) { | ||
throw new RuntimeException( | ||
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); | ||
|
@@ -412,9 +410,17 @@ public <T> List<T> listEntities( | |
// absence of transaction. | ||
String query = QueryGenerator.generateSelectQuery(new ModelEntity(), params); | ||
try { | ||
List<PolarisBaseEntity> results = | ||
datasourceOperations.executeSelect( | ||
query, ModelEntity.class, ModelEntity::toEntity, entityFilter, limit); | ||
List<PolarisBaseEntity> results = new ArrayList<>(); | ||
datasourceOperations.executeSelectOverStream( | ||
query, | ||
new ModelEntity(), | ||
stream -> { | ||
stream | ||
.map(ModelEntity::toEntity) | ||
.filter(entityFilter) | ||
.limit(limit) | ||
.forEach(results::add); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a way to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the loan pattern, we aren't returning anything so we need to get the list as a side effect. Unfortunately, there is a restriction in Java that variables pulled in to the scope of a lambda must be "effectively final", so we can't do like |
||
}); | ||
return results == null | ||
? Collections.emptyList() | ||
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); | ||
|
@@ -461,11 +467,7 @@ public PolarisGrantRecord lookupGrantRecord( | |
try { | ||
List<PolarisGrantRecord> results = | ||
datasourceOperations.executeSelect( | ||
query, | ||
ModelGrantRecord.class, | ||
ModelGrantRecord::toGrantRecord, | ||
null, | ||
Integer.MAX_VALUE); | ||
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); | ||
if (results.size() > 1) { | ||
throw new IllegalStateException( | ||
String.format( | ||
|
@@ -496,11 +498,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnSecurable( | |
try { | ||
List<PolarisGrantRecord> results = | ||
datasourceOperations.executeSelect( | ||
query, | ||
ModelGrantRecord.class, | ||
ModelGrantRecord::toGrantRecord, | ||
null, | ||
Integer.MAX_VALUE); | ||
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); | ||
return results == null ? Collections.emptyList() : results; | ||
} catch (SQLException e) { | ||
throw new RuntimeException( | ||
|
@@ -522,11 +520,7 @@ public List<PolarisGrantRecord> loadAllGrantRecordsOnGrantee( | |
try { | ||
List<PolarisGrantRecord> results = | ||
datasourceOperations.executeSelect( | ||
query, | ||
ModelGrantRecord.class, | ||
ModelGrantRecord::toGrantRecord, | ||
null, | ||
Integer.MAX_VALUE); | ||
query, new ModelGrantRecord(), ModelGrantRecord::toGrantRecord); | ||
return results == null ? Collections.emptyList() : results; | ||
} catch (SQLException e) { | ||
throw new RuntimeException( | ||
|
@@ -553,8 +547,7 @@ public boolean hasChildren( | |
String query = generateSelectQuery(new ModelEntity(), params); | ||
try { | ||
List<ModelEntity> results = | ||
datasourceOperations.executeSelect( | ||
query, ModelEntity.class, Function.identity(), null, Integer.MAX_VALUE); | ||
datasourceOperations.executeSelect(query, new ModelEntity(), Function.identity()); | ||
return results != null && !results.isEmpty(); | ||
} catch (SQLException e) { | ||
throw new RuntimeException( | ||
|
@@ -574,10 +567,8 @@ public PolarisPrincipalSecrets loadPrincipalSecrets( | |
List<PolarisPrincipalSecrets> results = | ||
datasourceOperations.executeSelect( | ||
query, | ||
ModelPrincipalAuthenticationData.class, | ||
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData, | ||
null, | ||
Integer.MAX_VALUE); | ||
new ModelPrincipalAuthenticationData(), | ||
ModelPrincipalAuthenticationData::toPrincipalAuthenticationData); | ||
return results == null || results.isEmpty() ? null : results.getFirst(); | ||
} catch (SQLException e) { | ||
LOGGER.error( | ||
|
@@ -880,10 +871,8 @@ private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(String query) | |
List<PolarisPolicyMappingRecord> results = | ||
datasourceOperations.executeSelect( | ||
query, | ||
ModelPolicyMappingRecord.class, | ||
ModelPolicyMappingRecord::toPolicyMappingRecord, | ||
null, | ||
Integer.MAX_VALUE); | ||
new ModelPolicyMappingRecord(), | ||
ModelPolicyMappingRecord::toPolicyMappingRecord); | ||
return results == null ? Collections.emptyList() : results; | ||
} catch (SQLException e) { | ||
throw new RuntimeException( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.polaris.extension.persistence.relational.jdbc; | ||
|
||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
import java.util.Spliterator; | ||
import java.util.Spliterators; | ||
import java.util.stream.Stream; | ||
import java.util.stream.StreamSupport; | ||
import org.apache.polaris.extension.persistence.relational.jdbc.models.Converter; | ||
|
||
/** | ||
* Used to wrap a ResultSet and to build a stream from the data it contains. This data structure | ||
* will not close the ResultSet passed in, so the caller is still responsible for managing its | ||
* lifecycle | ||
*/ | ||
public class ResultSetIterator<T> implements Iterator<T> { | ||
private final ResultSet resultSet; | ||
private final Converter<T> converterInstance; | ||
private boolean hasNext; | ||
|
||
public ResultSetIterator(ResultSet resultSet, Converter<T> converterInstance) | ||
throws SQLException { | ||
this.resultSet = resultSet; | ||
this.converterInstance = converterInstance; | ||
advance(); | ||
} | ||
|
||
private void advance() throws SQLException { | ||
try { | ||
hasNext = resultSet.next(); | ||
} catch (SQLException e) { | ||
hasNext = false; | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return hasNext; | ||
} | ||
|
||
@Override | ||
public T next() { | ||
if (!hasNext) { | ||
throw new NoSuchElementException(); | ||
} | ||
try { | ||
T object = converterInstance.fromResultSet(resultSet); | ||
advance(); | ||
return object; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public Stream<T> toStream() { | ||
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this, 0); | ||
return StreamSupport.stream(spliterator, false); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.